1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57 TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65 ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69pub(crate) type Timestamp = mz_repr::Timestamp;
71
72const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78const CATALOG_SHARD_NAME: &str = "catalog";
80const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83const CATALOG_SEED: usize = 1;
85const UPGRADE_SEED: usize = 2;
116pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124 Readonly,
126 Savepoint,
128 Writable,
130}
131
132#[derive(Debug)]
134pub(crate) enum FenceableToken {
135 Initializing {
138 durable_token: Option<FenceToken>,
140 current_deploy_generation: Option<u64>,
142 },
143 Unfenced { current_token: FenceToken },
145 Fenced {
147 current_token: FenceToken,
148 fence_token: FenceToken,
149 },
150}
151
152impl FenceableToken {
153 fn new(current_deploy_generation: Option<u64>) -> Self {
155 Self::Initializing {
156 durable_token: None,
157 current_deploy_generation,
158 }
159 }
160
161 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163 match self {
164 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166 FenceableToken::Fenced {
167 current_token,
168 fence_token,
169 } => {
170 assert!(
171 fence_token > current_token,
172 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173 );
174 if fence_token.deploy_generation > current_token.deploy_generation {
175 Err(FenceError::DeployGeneration {
176 current_generation: current_token.deploy_generation,
177 fence_generation: fence_token.deploy_generation,
178 })
179 } else {
180 assert!(
181 fence_token.epoch > current_token.epoch,
182 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183 );
184 Err(FenceError::Epoch {
185 current_epoch: current_token.epoch,
186 fence_epoch: fence_token.epoch,
187 })
188 }
189 }
190 }
191 }
192
193 fn token(&self) -> Option<FenceToken> {
195 match self {
196 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199 }
200 }
201
202 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204 match self {
205 FenceableToken::Initializing {
206 durable_token,
207 current_deploy_generation,
208 ..
209 } => {
210 match durable_token {
211 Some(durable_token) => {
212 *durable_token = max(durable_token.clone(), token.clone());
213 }
214 None => {
215 *durable_token = Some(token.clone());
216 }
217 }
218 if let Some(current_deploy_generation) = current_deploy_generation {
219 if *current_deploy_generation < token.deploy_generation {
220 *self = FenceableToken::Fenced {
221 current_token: FenceToken {
222 deploy_generation: *current_deploy_generation,
223 epoch: token.epoch,
224 },
225 fence_token: token,
226 };
227 self.validate()?;
228 }
229 }
230 }
231 FenceableToken::Unfenced { current_token } => {
232 if *current_token < token {
233 *self = FenceableToken::Fenced {
234 current_token: current_token.clone(),
235 fence_token: token,
236 };
237 self.validate()?;
238 }
239 }
240 FenceableToken::Fenced { .. } => {
241 self.validate()?;
242 }
243 }
244
245 Ok(())
246 }
247
248 fn generate_unfenced_token(
252 &self,
253 mode: Mode,
254 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255 let (durable_token, current_deploy_generation) = match self {
256 FenceableToken::Initializing {
257 durable_token,
258 current_deploy_generation,
259 } => (durable_token.clone(), current_deploy_generation.clone()),
260 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261 };
262
263 let mut fence_updates = Vec::with_capacity(2);
264
265 if let Some(durable_token) = &durable_token {
266 fence_updates.push((
267 StateUpdateKind::FenceToken(durable_token.clone()),
268 Diff::MINUS_ONE,
269 ));
270 }
271
272 let current_deploy_generation = current_deploy_generation
273 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274 .ok_or(DurableCatalogError::Uninitialized)?;
276 let mut current_epoch = durable_token
277 .map(|token| token.epoch)
278 .unwrap_or(MIN_EPOCH)
279 .get();
280 if matches!(mode, Mode::Writable) {
282 current_epoch = current_epoch + 1;
283 }
284 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285 let current_token = FenceToken {
286 deploy_generation: current_deploy_generation,
287 epoch: current_epoch,
288 };
289
290 fence_updates.push((
291 StateUpdateKind::FenceToken(current_token.clone()),
292 Diff::ONE,
293 ));
294
295 let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297 Ok(Some((fence_updates, current_fenceable_token)))
298 }
299}
300
301#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304 #[error(transparent)]
305 Fence(#[from] FenceError),
306 #[error(
309 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310 )]
311 UpperMismatch {
312 expected_upper: Timestamp,
313 actual_upper: Timestamp,
314 },
315}
316
317impl CompareAndAppendError {
318 pub(crate) fn unwrap_fence_error(self) -> FenceError {
319 match self {
320 CompareAndAppendError::Fence(e) => e,
321 e @ CompareAndAppendError::UpperMismatch { .. } => {
322 panic!("unexpected upper mismatch: {e:?}")
323 }
324 }
325 }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330 Self::UpperMismatch {
331 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332 actual_upper: antichain_to_timestamp(upper_mismatch.current),
333 }
334 }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338 fn apply_update(
342 &mut self,
343 update: StateUpdate<T>,
344 current_fence_token: &mut FenceableToken,
345 metrics: &Arc<Metrics>,
346 ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364 pub(crate) mode: Mode,
366 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372 persist_client: PersistClient,
374 shard_id: ShardId,
376 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380 update_applier: U,
382 pub(crate) upper: Timestamp,
384 fenceable_token: FenceableToken,
386 catalog_content_version: semver::Version,
388 bootstrap_complete: bool,
390 metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395 async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398
399 let () = self
400 .persist_client
401 .upgrade_version::<(), (), Timestamp, StorageDiff>(
402 upgrade_shard_id,
403 Diagnostics {
404 shard_name: UPGRADE_SHARD_NAME.to_string(),
405 handle_purpose: "durable catalog state upgrade".to_string(),
406 },
407 )
408 .await
409 .expect("invalid usage");
410 }
411
412 #[mz_ore::instrument]
414 async fn current_upper(&mut self) -> Timestamp {
415 match self.mode {
416 Mode::Writable | Mode::Readonly => {
417 let upper = self.write_handle.fetch_recent_upper().await;
418 antichain_to_timestamp(upper.clone())
419 }
420 Mode::Savepoint => self.upper,
421 }
422 }
423
424 #[mz_ore::instrument]
428 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
429 &mut self,
430 updates: Vec<(S, Diff)>,
431 commit_ts: Timestamp,
432 ) -> Result<Timestamp, CompareAndAppendError> {
433 assert_eq!(self.mode, Mode::Writable);
434 assert!(
435 commit_ts >= self.upper,
436 "expected commit ts, {}, to be greater than or equal to upper, {}",
437 commit_ts,
438 self.upper
439 );
440
441 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
444 let updates: Vec<_> = updates.clone();
445 let parsed_updates: Vec<_> = updates
446 .clone()
447 .into_iter()
448 .map(|(update, diff)| {
449 let update: StateUpdateKindJson = update.into();
450 (update, diff)
451 })
452 .filter_map(|(update, diff)| {
453 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
454 .ok()
455 .map(|update| (update, diff))
456 })
457 .collect();
458 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
459 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
460 });
461 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
462 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
463 });
464 let contains_fence = contains_retraction && contains_addition;
465 Some((contains_fence, updates))
466 } else {
467 None
468 };
469
470 let updates = updates.into_iter().map(|(kind, diff)| {
471 let kind: StateUpdateKindJson = kind.into();
472 (
473 (Into::<SourceData>::into(kind), ()),
474 commit_ts,
475 diff.into_inner(),
476 )
477 });
478 let next_upper = commit_ts.step_forward();
479 let res = self
480 .write_handle
481 .compare_and_append(
482 updates,
483 Antichain::from_elem(self.upper),
484 Antichain::from_elem(next_upper),
485 )
486 .await
487 .expect("invalid usage");
488
489 if let Err(e @ UpperMismatch { .. }) = res {
494 self.sync_to_current_upper().await?;
495 if let Some((contains_fence, updates)) = contains_fence {
496 assert!(
497 contains_fence,
498 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
499 )
500 }
501 return Err(e.into());
502 }
503
504 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
506
507 let opaque = *self.since_handle.opaque();
512 let downgrade = self
513 .since_handle
514 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
515 .await;
516
517 match downgrade {
518 None => {}
519 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
520 Some(Ok(updated)) => soft_assert_or_log!(
521 updated == downgrade_to,
522 "updated bound should match expected"
523 ),
524 }
525 self.sync(next_upper).await?;
526 Ok(next_upper)
527 }
528
529 #[mz_ore::instrument]
532 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
533 let current_upper = self.current_upper().await;
534
535 let mut snapshot = Vec::new();
536 let mut read_handle = self.read_handle().await;
537 let as_of = as_of(&read_handle, current_upper);
538 let mut stream = Box::pin(
539 read_handle
541 .snapshot_and_stream(Antichain::from_elem(as_of))
542 .await
543 .expect("we have advanced the restart_as_of by the since"),
544 );
545 while let Some(update) = stream.next().await {
546 snapshot.push(update)
547 }
548 read_handle.expire().await;
549 snapshot
550 .into_iter()
551 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
552 .map(|state_update| state_update.try_into().expect("kind decoding error"))
553 .collect()
554 }
555
556 #[mz_ore::instrument]
560 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
561 let upper = self.current_upper().await;
562 self.sync(upper).await
563 }
564
565 #[mz_ore::instrument(level = "debug")]
569 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
570 self.metrics.syncs.inc();
571 let counter = self.metrics.sync_latency_seconds.clone();
572 self.sync_inner(target_upper)
573 .wall_time()
574 .inc_by(counter)
575 .await
576 }
577
578 #[mz_ore::instrument(level = "debug")]
579 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
580 self.fenceable_token.validate()?;
581
582 if self.mode == Mode::Savepoint {
585 self.upper = max(self.upper, target_upper);
586 return Ok(());
587 }
588
589 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
590
591 while self.upper < target_upper {
592 let listen_events = self.listen.fetch_next().await;
593 for listen_event in listen_events {
594 match listen_event {
595 ListenEvent::Progress(upper) => {
596 debug!("synced up to {upper:?}");
597 self.upper = antichain_to_timestamp(upper);
598 while let Some((ts, updates)) = updates.pop_first() {
603 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
604 let updates = updates.into_iter().map(
605 |update: StateUpdate<StateUpdateKindJson>| {
606 let kind =
607 T::try_from(update.kind).expect("kind decoding error");
608 StateUpdate {
609 kind,
610 ts: update.ts,
611 diff: update.diff,
612 }
613 },
614 );
615 self.apply_updates(updates)?;
616 }
617 }
618 ListenEvent::Updates(batch_updates) => {
619 for update in batch_updates {
620 let update: StateUpdate<StateUpdateKindJson> = update.into();
621 updates.entry(update.ts).or_default().push(update);
622 }
623 }
624 }
625 }
626 }
627 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
628 Ok(())
629 }
630
631 #[mz_ore::instrument(level = "debug")]
632 pub(crate) fn apply_updates(
633 &mut self,
634 updates: impl IntoIterator<Item = StateUpdate<T>>,
635 ) -> Result<(), FenceError> {
636 let mut updates: Vec<_> = updates
637 .into_iter()
638 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
639 .collect();
640
641 differential_dataflow::consolidation::consolidate_updates(&mut updates);
645
646 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
649
650 let mut errors = Vec::new();
651
652 for (kind, ts, diff) in updates {
653 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
654 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
655 }
656
657 match self.update_applier.apply_update(
658 StateUpdate { kind, ts, diff },
659 &mut self.fenceable_token,
660 &self.metrics,
661 ) {
662 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
663 Ok(None) => {}
664 Err(err) => errors.push(err),
667 }
668 }
669
670 errors.sort();
671 if let Some(err) = errors.into_iter().next() {
672 return Err(err);
673 }
674
675 self.consolidate();
676
677 Ok(())
678 }
679
680 #[mz_ore::instrument]
681 pub(crate) fn consolidate(&mut self) {
682 soft_assert_no_log!(
683 self.snapshot
684 .windows(2)
685 .all(|updates| updates[0].1 <= updates[1].1),
686 "snapshot should be sorted by timestamp, {:#?}",
687 self.snapshot
688 );
689
690 let new_ts = self
691 .snapshot
692 .last()
693 .map(|(_, ts, _)| *ts)
694 .unwrap_or_else(Timestamp::minimum);
695 for (_, ts, _) in &mut self.snapshot {
696 *ts = new_ts;
697 }
698 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
699 }
700
701 async fn with_trace<R>(
705 &mut self,
706 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
707 ) -> Result<R, CatalogError> {
708 self.sync_to_current_upper().await?;
709 f(&self.snapshot)
710 }
711
712 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
714 self.persist_client
715 .open_leased_reader(
716 self.shard_id,
717 Arc::new(desc()),
718 Arc::new(UnitSchema::default()),
719 Diagnostics {
720 shard_name: CATALOG_SHARD_NAME.to_string(),
721 handle_purpose: "openable durable catalog state temporary reader".to_string(),
722 },
723 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
724 )
725 .await
726 .expect("invalid usage")
727 }
728
729 async fn expire(self: Box<Self>) {
731 self.write_handle.expire().await;
732 self.listen.expire().await;
733 }
734}
735
736impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
737 async fn with_snapshot<T>(
741 &mut self,
742 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
743 ) -> Result<T, CatalogError> {
744 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
745 where
746 K: Ord + Clone,
747 V: Ord + Clone + Debug,
748 {
749 let key = key.clone();
750 let value = value.clone();
751 if diff == Diff::ONE {
752 let prev = map.insert(key, value);
753 assert_eq!(
754 prev, None,
755 "values must be explicitly retracted before inserting a new value"
756 );
757 } else if diff == Diff::MINUS_ONE {
758 let prev = map.remove(&key);
759 assert_eq!(
760 prev,
761 Some(value),
762 "retraction does not match existing value"
763 );
764 }
765 }
766
767 self.with_trace(|trace| {
768 let mut snapshot = Snapshot::empty();
769 for (kind, ts, diff) in trace {
770 let diff = *diff;
771 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
772 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
773 }
774
775 match kind {
776 StateUpdateKind::AuditLog(_key, ()) => {
777 }
779 StateUpdateKind::Cluster(key, value) => {
780 apply(&mut snapshot.clusters, key, value, diff);
781 }
782 StateUpdateKind::ClusterReplica(key, value) => {
783 apply(&mut snapshot.cluster_replicas, key, value, diff);
784 }
785 StateUpdateKind::Comment(key, value) => {
786 apply(&mut snapshot.comments, key, value, diff);
787 }
788 StateUpdateKind::Config(key, value) => {
789 apply(&mut snapshot.configs, key, value, diff);
790 }
791 StateUpdateKind::Database(key, value) => {
792 apply(&mut snapshot.databases, key, value, diff);
793 }
794 StateUpdateKind::DefaultPrivilege(key, value) => {
795 apply(&mut snapshot.default_privileges, key, value, diff);
796 }
797 StateUpdateKind::FenceToken(_token) => {
798 }
800 StateUpdateKind::IdAllocator(key, value) => {
801 apply(&mut snapshot.id_allocator, key, value, diff);
802 }
803 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
804 apply(&mut snapshot.introspection_sources, key, value, diff);
805 }
806 StateUpdateKind::Item(key, value) => {
807 apply(&mut snapshot.items, key, value, diff);
808 }
809 StateUpdateKind::NetworkPolicy(key, value) => {
810 apply(&mut snapshot.network_policies, key, value, diff);
811 }
812 StateUpdateKind::Role(key, value) => {
813 apply(&mut snapshot.roles, key, value, diff);
814 }
815 StateUpdateKind::Schema(key, value) => {
816 apply(&mut snapshot.schemas, key, value, diff);
817 }
818 StateUpdateKind::Setting(key, value) => {
819 apply(&mut snapshot.settings, key, value, diff);
820 }
821 StateUpdateKind::SourceReferences(key, value) => {
822 apply(&mut snapshot.source_references, key, value, diff);
823 }
824 StateUpdateKind::SystemConfiguration(key, value) => {
825 apply(&mut snapshot.system_configurations, key, value, diff);
826 }
827 StateUpdateKind::SystemObjectMapping(key, value) => {
828 apply(&mut snapshot.system_object_mappings, key, value, diff);
829 }
830 StateUpdateKind::SystemPrivilege(key, value) => {
831 apply(&mut snapshot.system_privileges, key, value, diff);
832 }
833 StateUpdateKind::StorageCollectionMetadata(key, value) => {
834 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
835 }
836 StateUpdateKind::UnfinalizedShard(key, ()) => {
837 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
838 }
839 StateUpdateKind::TxnWalShard((), value) => {
840 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
841 }
842 StateUpdateKind::RoleAuth(key, value) => {
843 apply(&mut snapshot.role_auth, key, value, diff);
844 }
845 }
846 }
847 f(snapshot)
848 })
849 .await
850 }
851
852 #[mz_ore::instrument(level = "debug")]
859 async fn persist_snapshot(
860 &mut self,
861 ) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
862 let mut read_handle = self.read_handle().await;
863 let as_of = as_of(&read_handle, self.upper);
864 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
865 .await
866 .map(|update| update.try_into().expect("kind decoding error"));
867 read_handle.expire().await;
868 snapshot
869 }
870}
871
872#[derive(Debug)]
874pub(crate) struct UnopenedCatalogStateInner {
875 organization_id: Uuid,
877 configs: BTreeMap<String, u64>,
879 settings: BTreeMap<String, String>,
881}
882
883impl UnopenedCatalogStateInner {
884 fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
885 UnopenedCatalogStateInner {
886 organization_id,
887 configs: BTreeMap::new(),
888 settings: BTreeMap::new(),
889 }
890 }
891}
892
893impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
894 fn apply_update(
895 &mut self,
896 update: StateUpdate<StateUpdateKindJson>,
897 current_fence_token: &mut FenceableToken,
898 _metrics: &Arc<Metrics>,
899 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
900 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
901 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
902 match (kind, update.diff) {
903 (StateUpdateKind::Config(key, value), Diff::ONE) => {
904 let prev = self.configs.insert(key.key, value.value);
905 assert_eq!(
906 prev, None,
907 "values must be explicitly retracted before inserting a new value"
908 );
909 }
910 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
911 let prev = self.configs.remove(&key.key);
912 assert_eq!(
913 prev,
914 Some(value.value),
915 "retraction does not match existing value"
916 );
917 }
918 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
919 let prev = self.settings.insert(key.name, value.value);
920 assert_eq!(
921 prev, None,
922 "values must be explicitly retracted before inserting a new value"
923 );
924 }
925 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
926 let prev = self.settings.remove(&key.name);
927 assert_eq!(
928 prev,
929 Some(value.value),
930 "retraction does not match existing value"
931 );
932 }
933 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
934 current_fence_token.maybe_fence(fence_token)?;
935 }
936 _ => {}
937 }
938 }
939
940 Ok(Some(update))
941 }
942}
943
944pub(crate) type UnopenedPersistCatalogState =
952 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
953
954impl UnopenedPersistCatalogState {
955 #[mz_ore::instrument]
961 pub(crate) async fn new(
962 persist_client: PersistClient,
963 organization_id: Uuid,
964 version: semver::Version,
965 deploy_generation: Option<u64>,
966 metrics: Arc<Metrics>,
967 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
968 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
969 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
970 debug!(
971 ?catalog_shard_id,
972 ?upgrade_shard_id,
973 "new persist backed catalog state"
974 );
975
976 let version_in_upgrade_shard =
978 fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
979 if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
982 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_upgrade_shard) {
988 return Err(DurableCatalogError::IncompatiblePersistVersion {
989 found_version: version_in_upgrade_shard,
990 catalog_version: version,
991 });
992 }
993 }
994
995 let open_handles_start = Instant::now();
996 info!("startup: envd serve: catalog init: open handles beginning");
997 let since_handle = persist_client
998 .open_critical_since(
999 catalog_shard_id,
1000 PersistClient::CONTROLLER_CRITICAL_SINCE,
1003 Diagnostics {
1004 shard_name: CATALOG_SHARD_NAME.to_string(),
1005 handle_purpose: "durable catalog state critical since".to_string(),
1006 },
1007 )
1008 .await
1009 .expect("invalid usage");
1010 let (mut write_handle, mut read_handle) = persist_client
1011 .open(
1012 catalog_shard_id,
1013 Arc::new(desc()),
1014 Arc::new(UnitSchema::default()),
1015 Diagnostics {
1016 shard_name: CATALOG_SHARD_NAME.to_string(),
1017 handle_purpose: "durable catalog state handles".to_string(),
1018 },
1019 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1020 )
1021 .await
1022 .expect("invalid usage");
1023 info!(
1024 "startup: envd serve: catalog init: open handles complete in {:?}",
1025 open_handles_start.elapsed()
1026 );
1027
1028 let upper = {
1030 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1031 let upper = Antichain::from_elem(Timestamp::minimum());
1032 let next_upper = Timestamp::minimum().step_forward();
1033 match write_handle
1034 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1035 .await
1036 .expect("invalid usage")
1037 {
1038 Ok(()) => next_upper,
1039 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1040 }
1041 };
1042
1043 let snapshot_start = Instant::now();
1044 info!("startup: envd serve: catalog init: snapshot beginning");
1045 let as_of = as_of(&read_handle, upper);
1046 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1047 .await
1048 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1049 .collect();
1050 let listen = read_handle
1051 .listen(Antichain::from_elem(as_of))
1052 .await
1053 .expect("invalid usage");
1054 info!(
1055 "startup: envd serve: catalog init: snapshot complete in {:?}",
1056 snapshot_start.elapsed()
1057 );
1058
1059 let mut handle = UnopenedPersistCatalogState {
1060 mode: Mode::Writable,
1062 since_handle,
1063 write_handle,
1064 listen,
1065 persist_client,
1066 shard_id: catalog_shard_id,
1067 snapshot: Vec::new(),
1069 update_applier: UnopenedCatalogStateInner::new(organization_id),
1070 upper,
1071 fenceable_token: FenceableToken::new(deploy_generation),
1072 catalog_content_version: version,
1073 bootstrap_complete: false,
1074 metrics,
1075 };
1076 soft_assert_no_log!(
1079 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1080 "snapshot should be consolidated: {snapshot:#?}"
1081 );
1082
1083 let apply_start = Instant::now();
1084 info!("startup: envd serve: catalog init: apply updates beginning");
1085 let updates = snapshot
1086 .into_iter()
1087 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1088 handle.apply_updates(updates)?;
1089 info!(
1090 "startup: envd serve: catalog init: apply updates complete in {:?}",
1091 apply_start.elapsed()
1092 );
1093
1094 if let Some(found_version) = handle.get_catalog_content_version().await? {
1100 if handle.catalog_content_version < found_version {
1101 return Err(DurableCatalogError::IncompatiblePersistVersion {
1102 found_version,
1103 catalog_version: handle.catalog_content_version,
1104 });
1105 }
1106 }
1107
1108 Ok(handle)
1109 }
1110
1111 #[mz_ore::instrument]
1112 async fn open_inner(
1113 mut self,
1114 mode: Mode,
1115 initial_ts: Timestamp,
1116 bootstrap_args: &BootstrapArgs,
1117 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1118 let mut commit_ts = self.upper;
1121 self.mode = mode;
1122
1123 match (&self.mode, &self.fenceable_token) {
1125 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1126 return Err(DurableCatalogError::Internal(
1127 "catalog should not have fenced before opening".to_string(),
1128 )
1129 .into());
1130 }
1131 (
1132 Mode::Writable | Mode::Savepoint,
1133 FenceableToken::Initializing {
1134 current_deploy_generation: None,
1135 ..
1136 },
1137 ) => {
1138 return Err(DurableCatalogError::Internal(format!(
1139 "cannot open in mode '{:?}' without a deploy generation",
1140 self.mode,
1141 ))
1142 .into());
1143 }
1144 _ => {}
1145 }
1146
1147 let read_only = matches!(self.mode, Mode::Readonly);
1148
1149 loop {
1151 self.sync_to_current_upper().await?;
1152 commit_ts = max(commit_ts, self.upper);
1153 let (fence_updates, current_fenceable_token) = self
1154 .fenceable_token
1155 .generate_unfenced_token(self.mode)?
1156 .ok_or_else(|| {
1157 DurableCatalogError::Internal(
1158 "catalog should not have fenced before opening".to_string(),
1159 )
1160 })?;
1161 debug!(
1162 ?self.upper,
1163 ?self.fenceable_token,
1164 ?current_fenceable_token,
1165 "fencing previous catalogs"
1166 );
1167 if matches!(self.mode, Mode::Writable) {
1168 match self
1169 .compare_and_append(fence_updates.clone(), commit_ts)
1170 .await
1171 {
1172 Ok(upper) => {
1173 commit_ts = upper;
1174 }
1175 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1176 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1177 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1178 continue;
1179 }
1180 }
1181 }
1182 self.fenceable_token = current_fenceable_token;
1183 break;
1184 }
1185
1186 let is_initialized = self.is_initialized_inner();
1187 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1188 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1189 format!(
1190 "catalog tables do not exist; will not create in {:?} mode",
1191 self.mode
1192 ),
1193 )));
1194 }
1195 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1196
1197 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1199 .snapshot
1200 .into_iter()
1201 .partition(|(update, _, _)| update.is_audit_log());
1202 self.snapshot = snapshot;
1203 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1204 let audit_log_handle = AuditLogIterator::new(audit_logs);
1205
1206 if is_initialized && !read_only {
1208 commit_ts = upgrade(&mut self, commit_ts).await?;
1209 }
1210
1211 debug!(
1212 ?is_initialized,
1213 ?self.upper,
1214 "initializing catalog state"
1215 );
1216 let mut catalog = PersistCatalogState {
1217 mode: self.mode,
1218 since_handle: self.since_handle,
1219 write_handle: self.write_handle,
1220 listen: self.listen,
1221 persist_client: self.persist_client,
1222 shard_id: self.shard_id,
1223 upper: self.upper,
1224 fenceable_token: self.fenceable_token,
1225 snapshot: Vec::new(),
1227 update_applier: CatalogStateInner::new(),
1228 catalog_content_version: self.catalog_content_version,
1229 bootstrap_complete: false,
1230 metrics: self.metrics,
1231 };
1232 catalog.metrics.collection_entries.reset();
1233 catalog
1236 .metrics
1237 .collection_entries
1238 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1239 .add(audit_log_count.into_inner());
1240 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1241 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1242 StateUpdate { kind, ts, diff }
1243 });
1244 catalog.apply_updates(updates)?;
1245
1246 let catalog_content_version = catalog.catalog_content_version.to_string();
1247 let txn = if is_initialized {
1248 let mut txn = catalog.transaction().await?;
1249
1250 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1257 let old_version = txn.get_catalog_content_version();
1258 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1259 }
1260
1261 txn.set_catalog_content_version(catalog_content_version)?;
1262 txn
1263 } else {
1264 soft_assert_eq_no_log!(
1265 catalog
1266 .snapshot
1267 .iter()
1268 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1269 .count(),
1270 0,
1271 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1272 catalog.snapshot
1273 );
1274
1275 let mut txn = catalog.transaction().await?;
1276 initialize::initialize(
1277 &mut txn,
1278 bootstrap_args,
1279 initial_ts.into(),
1280 catalog_content_version,
1281 )
1282 .await?;
1283 txn
1284 };
1285
1286 if read_only {
1287 let (txn_batch, _) = txn.into_parts();
1288 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1290 catalog.apply_updates(updates)?;
1291 } else {
1292 txn.commit_internal(commit_ts).await?;
1293 }
1294
1295 if matches!(catalog.mode, Mode::Writable) {
1299 catalog
1300 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1301 .await;
1302
1303 let write_handle = catalog
1304 .persist_client
1305 .open_writer::<SourceData, (), Timestamp, i64>(
1306 catalog.write_handle.shard_id(),
1307 Arc::new(desc()),
1308 Arc::new(UnitSchema::default()),
1309 Diagnostics {
1310 shard_name: CATALOG_SHARD_NAME.to_string(),
1311 handle_purpose: "compact catalog".to_string(),
1312 },
1313 )
1314 .await
1315 .expect("invalid usage");
1316 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1317 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1318 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1321 let () =
1322 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1323 &write_handle,
1324 || fuel.get(),
1325 || wait.get(),
1326 )
1327 .await;
1328 });
1329 }
1330
1331 Ok((Box::new(catalog), audit_log_handle))
1332 }
1333
1334 #[mz_ore::instrument]
1339 fn is_initialized_inner(&self) -> bool {
1340 !self.update_applier.configs.is_empty()
1341 }
1342
1343 #[mz_ore::instrument]
1347 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1348 self.sync_to_current_upper().await?;
1349 Ok(self.update_applier.configs.get(key).cloned())
1350 }
1351
1352 #[mz_ore::instrument]
1356 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1357 self.get_current_config(USER_VERSION_KEY).await
1358 }
1359
1360 #[mz_ore::instrument]
1364 async fn get_current_setting(
1365 &mut self,
1366 name: &str,
1367 ) -> Result<Option<String>, DurableCatalogError> {
1368 self.sync_to_current_upper().await?;
1369 Ok(self.update_applier.settings.get(name).cloned())
1370 }
1371
1372 #[mz_ore::instrument]
1377 async fn get_catalog_content_version(
1378 &mut self,
1379 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1380 let version = self
1381 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1382 .await?;
1383 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1384 Ok(version)
1385 }
1386}
1387
1388#[async_trait]
1389impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1390 #[mz_ore::instrument]
1391 async fn open_savepoint(
1392 mut self: Box<Self>,
1393 initial_ts: Timestamp,
1394 bootstrap_args: &BootstrapArgs,
1395 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1396 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1397 .boxed()
1398 .await
1399 }
1400
1401 #[mz_ore::instrument]
1402 async fn open_read_only(
1403 mut self: Box<Self>,
1404 bootstrap_args: &BootstrapArgs,
1405 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1406 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1407 .boxed()
1408 .await
1409 .map(|(catalog, _)| catalog)
1410 }
1411
1412 #[mz_ore::instrument]
1413 async fn open(
1414 mut self: Box<Self>,
1415 initial_ts: Timestamp,
1416 bootstrap_args: &BootstrapArgs,
1417 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1418 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1419 .boxed()
1420 .await
1421 }
1422
1423 #[mz_ore::instrument(level = "debug")]
1424 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1425 Ok(DebugCatalogState(*self))
1426 }
1427
1428 #[mz_ore::instrument]
1429 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1430 self.sync_to_current_upper().await?;
1431 Ok(self.is_initialized_inner())
1432 }
1433
1434 #[mz_ore::instrument]
1435 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1436 self.sync_to_current_upper().await?;
1437 self.fenceable_token
1438 .validate()?
1439 .map(|token| token.epoch)
1440 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1441 }
1442
1443 #[mz_ore::instrument]
1444 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1445 self.sync_to_current_upper().await?;
1446 self.fenceable_token
1447 .token()
1448 .map(|token| token.deploy_generation)
1449 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1450 }
1451
1452 #[mz_ore::instrument(level = "debug")]
1453 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1454 let value = self
1455 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1456 .await?;
1457 match value {
1458 None => Ok(None),
1459 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1460 }
1461 }
1462
1463 #[mz_ore::instrument(level = "debug")]
1464 async fn get_0dt_deployment_ddl_check_interval(
1465 &mut self,
1466 ) -> Result<Option<Duration>, CatalogError> {
1467 let value = self
1468 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1469 .await?;
1470 match value {
1471 None => Ok(None),
1472 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1473 }
1474 }
1475
1476 #[mz_ore::instrument(level = "debug")]
1477 async fn get_enable_0dt_deployment_panic_after_timeout(
1478 &mut self,
1479 ) -> Result<Option<bool>, CatalogError> {
1480 let value = self
1481 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1482 .await?;
1483 match value {
1484 None => Ok(None),
1485 Some(0) => Ok(Some(false)),
1486 Some(1) => Ok(Some(true)),
1487 Some(v) => Err(
1488 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1489 "{v} is not a valid boolean value"
1490 )))
1491 .into(),
1492 ),
1493 }
1494 }
1495
1496 #[mz_ore::instrument]
1497 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1498 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1499 .await
1500 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1501 }
1502
1503 #[mz_ore::instrument]
1504 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1505 self.sync_to_current_upper().await?;
1506 if self.is_initialized_inner() {
1507 let snapshot = self.snapshot_unconsolidated().await;
1508 Ok(Trace::from_snapshot(snapshot))
1509 } else {
1510 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1511 }
1512 }
1513
1514 #[mz_ore::instrument]
1515 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1516 self.sync_to_current_upper().await?;
1517 if self.is_initialized_inner() {
1518 let snapshot = self.current_snapshot().await?;
1519 Ok(Trace::from_snapshot(snapshot))
1520 } else {
1521 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1522 }
1523 }
1524
1525 #[mz_ore::instrument(level = "debug")]
1526 async fn expire(self: Box<Self>) {
1527 self.expire().await
1528 }
1529}
1530
1531#[derive(Debug)]
1533struct CatalogStateInner {
1534 updates: VecDeque<memory::objects::StateUpdate>,
1536}
1537
1538impl CatalogStateInner {
1539 fn new() -> CatalogStateInner {
1540 CatalogStateInner {
1541 updates: VecDeque::new(),
1542 }
1543 }
1544}
1545
1546impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1547 fn apply_update(
1548 &mut self,
1549 update: StateUpdate<StateUpdateKind>,
1550 current_fence_token: &mut FenceableToken,
1551 metrics: &Arc<Metrics>,
1552 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1553 if let Some(collection_type) = update.kind.collection_type() {
1554 metrics
1555 .collection_entries
1556 .with_label_values(&[&collection_type.to_string()])
1557 .add(update.diff.into_inner());
1558 }
1559
1560 {
1561 let update: Option<memory::objects::StateUpdate> = (&update)
1562 .try_into()
1563 .expect("invalid persisted update: {update:#?}");
1564 if let Some(update) = update {
1565 self.updates.push_back(update);
1566 }
1567 }
1568
1569 match (update.kind, update.diff) {
1570 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1571 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1573 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1574 current_fence_token.maybe_fence(token)?;
1575 Ok(None)
1576 }
1577 (kind, diff) => Ok(Some(StateUpdate {
1578 kind,
1579 ts: update.ts,
1580 diff,
1581 })),
1582 }
1583 }
1584}
1585
1586type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1592
1593#[async_trait]
1594impl ReadOnlyDurableCatalogState for PersistCatalogState {
1595 fn epoch(&self) -> Epoch {
1596 self.fenceable_token
1597 .token()
1598 .expect("opened catalog state must have an epoch")
1599 .epoch
1600 }
1601
1602 #[mz_ore::instrument(level = "debug")]
1603 async fn expire(self: Box<Self>) {
1604 self.expire().await
1605 }
1606
1607 fn is_bootstrap_complete(&self) -> bool {
1608 self.bootstrap_complete
1609 }
1610
1611 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1612 self.sync_to_current_upper().await?;
1613 let audit_logs: Vec<_> = self
1614 .persist_snapshot()
1615 .await
1616 .filter_map(
1617 |StateUpdate {
1618 kind,
1619 ts: _,
1620 diff: _,
1621 }| match kind {
1622 StateUpdateKind::AuditLog(key, ()) => Some(key),
1623 _ => None,
1624 },
1625 )
1626 .collect();
1627 let mut audit_logs: Vec<_> = audit_logs
1628 .into_iter()
1629 .map(RustType::from_proto)
1630 .map_ok(|key: AuditLogKey| key.event)
1631 .collect::<Result<_, _>>()?;
1632 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1633 Ok(audit_logs)
1634 }
1635
1636 #[mz_ore::instrument(level = "debug")]
1637 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1638 self.with_trace(|trace| {
1639 Ok(trace
1640 .into_iter()
1641 .rev()
1642 .filter_map(|(kind, _, _)| match kind {
1643 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1644 Some(value.next_id)
1645 }
1646 _ => None,
1647 })
1648 .next()
1649 .expect("must exist"))
1650 })
1651 .await
1652 }
1653
1654 #[mz_ore::instrument(level = "debug")]
1655 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1656 self.sync_to_current_upper().await?;
1657 Ok(self
1658 .fenceable_token
1659 .token()
1660 .expect("opened catalogs must have a token")
1661 .deploy_generation)
1662 }
1663
1664 #[mz_ore::instrument(level = "debug")]
1665 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1666 self.with_snapshot(Ok).await
1667 }
1668
1669 #[mz_ore::instrument(level = "debug")]
1670 async fn sync_to_current_updates(
1671 &mut self,
1672 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1673 let upper = self.current_upper().await;
1674 self.sync_updates(upper).await
1675 }
1676
1677 #[mz_ore::instrument(level = "debug")]
1678 async fn sync_updates(
1679 &mut self,
1680 target_upper: mz_repr::Timestamp,
1681 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1682 self.sync(target_upper).await?;
1683 let mut updates = Vec::new();
1684 while let Some(update) = self.update_applier.updates.front() {
1685 if update.ts >= target_upper {
1686 break;
1687 }
1688
1689 let update = self
1690 .update_applier
1691 .updates
1692 .pop_front()
1693 .expect("peeked above");
1694 updates.push(update);
1695 }
1696 Ok(updates)
1697 }
1698
1699 async fn current_upper(&mut self) -> Timestamp {
1700 self.current_upper().await
1701 }
1702}
1703
1704#[async_trait]
1705#[allow(mismatched_lifetime_syntaxes)]
1706impl DurableCatalogState for PersistCatalogState {
1707 fn is_read_only(&self) -> bool {
1708 matches!(self.mode, Mode::Readonly)
1709 }
1710
1711 fn is_savepoint(&self) -> bool {
1712 matches!(self.mode, Mode::Savepoint)
1713 }
1714
1715 async fn mark_bootstrap_complete(&mut self) {
1716 self.bootstrap_complete = true;
1717 if matches!(self.mode, Mode::Writable) {
1718 self.since_handle
1719 .upgrade_version()
1720 .await
1721 .expect("invalid usage")
1722 }
1723 }
1724
1725 #[mz_ore::instrument(level = "debug")]
1726 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1727 self.metrics.transactions_started.inc();
1728 let snapshot = self.snapshot().await?;
1729 let commit_ts = self.upper.clone();
1730 Transaction::new(self, snapshot, commit_ts)
1731 }
1732
1733 #[mz_ore::instrument(level = "debug")]
1734 async fn commit_transaction(
1735 &mut self,
1736 txn_batch: TransactionBatch,
1737 commit_ts: Timestamp,
1738 ) -> Result<Timestamp, CatalogError> {
1739 async fn commit_transaction_inner(
1740 catalog: &mut PersistCatalogState,
1741 txn_batch: TransactionBatch,
1742 commit_ts: Timestamp,
1743 ) -> Result<Timestamp, CatalogError> {
1744 assert_eq!(
1749 catalog.upper, txn_batch.upper,
1750 "only one transaction at a time is supported"
1751 );
1752
1753 assert!(
1754 commit_ts >= catalog.upper,
1755 "expected commit ts, {}, to be greater than or equal to upper, {}",
1756 commit_ts,
1757 catalog.upper
1758 );
1759
1760 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1761 debug!("committing updates: {updates:?}");
1762
1763 let next_upper = match catalog.mode {
1764 Mode::Writable => catalog
1765 .compare_and_append(updates, commit_ts)
1766 .await
1767 .map_err(|e| e.unwrap_fence_error())?,
1768 Mode::Savepoint => {
1769 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1770 kind,
1771 ts: commit_ts,
1772 diff,
1773 });
1774 catalog.apply_updates(updates)?;
1775 catalog.upper = commit_ts.step_forward();
1776 catalog.upper
1777 }
1778 Mode::Readonly => {
1779 if !updates.is_empty() {
1783 return Err(DurableCatalogError::NotWritable(format!(
1784 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1785 ))
1786 .into());
1787 }
1788 catalog.upper
1789 }
1790 };
1791
1792 Ok(next_upper)
1793 }
1794 self.metrics.transaction_commits.inc();
1795 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1796 commit_transaction_inner(self, txn_batch, commit_ts)
1797 .wall_time()
1798 .inc_by(counter)
1799 .await
1800 }
1801
1802 #[mz_ore::instrument(level = "debug")]
1803 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1804 if self.is_read_only() {
1806 return Ok(());
1807 }
1808 self.sync_to_current_upper().await?;
1809 Ok(())
1810 }
1811}
1812
1813pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1815 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1816 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1817 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1818 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1819}
1820
1821fn desc() -> RelationDesc {
1824 RelationDesc::builder()
1825 .with_column("data", SqlScalarType::Jsonb.nullable(false))
1826 .finish()
1827}
1828
1829fn as_of(
1832 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1833 upper: Timestamp,
1834) -> Timestamp {
1835 let since = read_handle.since().clone();
1836 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1837 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1838 });
1839 soft_assert_or_log!(
1842 since.less_equal(&as_of),
1843 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1844 );
1845 as_of.advance_by(since.borrow());
1848 as_of
1849}
1850
1851async fn fetch_catalog_upgrade_shard_version(
1854 persist_client: &PersistClient,
1855 upgrade_shard_id: ShardId,
1856) -> Option<semver::Version> {
1857 let shard_state = persist_client
1858 .inspect_shard::<Timestamp>(&upgrade_shard_id)
1859 .await
1860 .ok()?;
1861 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1862 let upgrade_version = json_state
1863 .get("applier_version")
1864 .cloned()
1865 .expect("missing applier_version");
1866 let upgrade_version =
1867 serde_json::from_value(upgrade_version).expect("version deserialization error");
1868 Some(upgrade_version)
1869}
1870
1871#[mz_ore::instrument(level = "debug")]
1876async fn snapshot_binary(
1877 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1878 as_of: Timestamp,
1879 metrics: &Arc<Metrics>,
1880) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1881 metrics.snapshots_taken.inc();
1882 let counter = metrics.snapshot_latency_seconds.clone();
1883 snapshot_binary_inner(read_handle, as_of)
1884 .wall_time()
1885 .inc_by(counter)
1886 .await
1887}
1888
1889#[mz_ore::instrument(level = "debug")]
1894async fn snapshot_binary_inner(
1895 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1896 as_of: Timestamp,
1897) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1898 let snapshot = read_handle
1899 .snapshot_and_fetch(Antichain::from_elem(as_of))
1900 .await
1901 .expect("we have advanced the restart_as_of by the since");
1902 soft_assert_no_log!(
1903 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1904 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1905 );
1906 snapshot
1907 .into_iter()
1908 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1909 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1910}
1911
1912pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1917 antichain
1918 .into_option()
1919 .expect("we use a totally ordered time and never finalize the shard")
1920}
1921
1922impl Trace {
1925 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1927 let mut trace = Trace::new();
1928 for StateUpdate { kind, ts, diff } in snapshot {
1929 match kind {
1930 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1931 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1932 StateUpdateKind::ClusterReplica(k, v) => {
1933 trace.cluster_replicas.values.push(((k, v), ts, diff))
1934 }
1935 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1936 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1937 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1938 StateUpdateKind::DefaultPrivilege(k, v) => {
1939 trace.default_privileges.values.push(((k, v), ts, diff))
1940 }
1941 StateUpdateKind::FenceToken(_) => {
1942 }
1944 StateUpdateKind::IdAllocator(k, v) => {
1945 trace.id_allocator.values.push(((k, v), ts, diff))
1946 }
1947 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1948 trace.introspection_sources.values.push(((k, v), ts, diff))
1949 }
1950 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1951 StateUpdateKind::NetworkPolicy(k, v) => {
1952 trace.network_policies.values.push(((k, v), ts, diff))
1953 }
1954 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1955 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1956 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1957 StateUpdateKind::SourceReferences(k, v) => {
1958 trace.source_references.values.push(((k, v), ts, diff))
1959 }
1960 StateUpdateKind::SystemConfiguration(k, v) => {
1961 trace.system_configurations.values.push(((k, v), ts, diff))
1962 }
1963 StateUpdateKind::SystemObjectMapping(k, v) => {
1964 trace.system_object_mappings.values.push(((k, v), ts, diff))
1965 }
1966 StateUpdateKind::SystemPrivilege(k, v) => {
1967 trace.system_privileges.values.push(((k, v), ts, diff))
1968 }
1969 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1970 .storage_collection_metadata
1971 .values
1972 .push(((k, v), ts, diff)),
1973 StateUpdateKind::UnfinalizedShard(k, ()) => {
1974 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1975 }
1976 StateUpdateKind::TxnWalShard((), v) => {
1977 trace.txn_wal_shard.values.push((((), v), ts, diff))
1978 }
1979 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1980 }
1981 }
1982 trace
1983 }
1984}
1985
1986impl UnopenedPersistCatalogState {
1987 #[mz_ore::instrument]
1989 pub(crate) async fn debug_edit<T: Collection>(
1990 &mut self,
1991 key: T::Key,
1992 value: T::Value,
1993 ) -> Result<Option<T::Value>, CatalogError>
1994 where
1995 T::Key: PartialEq + Eq + Debug + Clone,
1996 T::Value: Debug + Clone,
1997 {
1998 let prev_value = loop {
1999 let key = key.clone();
2000 let value = value.clone();
2001 let snapshot = self.current_snapshot().await?;
2002 let trace = Trace::from_snapshot(snapshot);
2003 let collection_trace = T::collection_trace(trace);
2004 let prev_values: Vec<_> = collection_trace
2005 .values
2006 .into_iter()
2007 .filter(|((k, _), _, diff)| {
2008 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2009 &key == k
2010 })
2011 .collect();
2012
2013 let prev_value = match &prev_values[..] {
2014 [] => None,
2015 [((_, v), _, _)] => Some(v.clone()),
2016 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2017 };
2018
2019 let mut updates: Vec<_> = prev_values
2020 .into_iter()
2021 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2022 .collect();
2023 updates.push((T::update(key, value), Diff::ONE));
2024 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2026 Some((fence_updates, current_fenceable_token)) => {
2027 updates.extend(fence_updates.clone());
2028 match self.compare_and_append(updates, self.upper).await {
2029 Ok(_) => {
2030 self.fenceable_token = current_fenceable_token;
2031 break prev_value;
2032 }
2033 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2034 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2035 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2036 continue;
2037 }
2038 }
2039 }
2040 None => {
2041 self.compare_and_append(updates, self.upper)
2042 .await
2043 .map_err(|e| e.unwrap_fence_error())?;
2044 break prev_value;
2045 }
2046 }
2047 };
2048 Ok(prev_value)
2049 }
2050
2051 #[mz_ore::instrument]
2053 pub(crate) async fn debug_delete<T: Collection>(
2054 &mut self,
2055 key: T::Key,
2056 ) -> Result<(), CatalogError>
2057 where
2058 T::Key: PartialEq + Eq + Debug + Clone,
2059 T::Value: Debug,
2060 {
2061 loop {
2062 let key = key.clone();
2063 let snapshot = self.current_snapshot().await?;
2064 let trace = Trace::from_snapshot(snapshot);
2065 let collection_trace = T::collection_trace(trace);
2066 let mut retractions: Vec<_> = collection_trace
2067 .values
2068 .into_iter()
2069 .filter(|((k, _), _, diff)| {
2070 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2071 &key == k
2072 })
2073 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2074 .collect();
2075
2076 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2078 Some((fence_updates, current_fenceable_token)) => {
2079 retractions.extend(fence_updates.clone());
2080 match self.compare_and_append(retractions, self.upper).await {
2081 Ok(_) => {
2082 self.fenceable_token = current_fenceable_token;
2083 break;
2084 }
2085 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2086 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2087 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2088 continue;
2089 }
2090 }
2091 }
2092 None => {
2093 self.compare_and_append(retractions, self.upper)
2094 .await
2095 .map_err(|e| e.unwrap_fence_error())?;
2096 break;
2097 }
2098 }
2099 }
2100 Ok(())
2101 }
2102
2103 async fn current_snapshot(
2108 &mut self,
2109 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2110 self.sync_to_current_upper().await?;
2111 self.consolidate();
2112 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2113 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2114 StateUpdate { kind, ts, diff }
2115 }))
2116 }
2117}