1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79const CATALOG_SHARD_NAME: &str = "catalog";
81
82static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85 "c55555555-6666-7777-8888-999999999999"
86 .parse()
87 .expect("valid CriticalReaderId")
88});
89
90const CATALOG_SEED: usize = 1;
92const _UPGRADE_SEED: usize = 2;
94pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102 Readonly,
104 Savepoint,
106 Writable,
108}
109
110#[derive(Debug)]
112pub(crate) enum FenceableToken {
113 Initializing {
116 durable_token: Option<FenceToken>,
118 current_deploy_generation: Option<u64>,
120 },
121 Unfenced { current_token: FenceToken },
123 Fenced {
125 current_token: FenceToken,
126 fence_token: FenceToken,
127 },
128}
129
130impl FenceableToken {
131 fn new(current_deploy_generation: Option<u64>) -> Self {
133 Self::Initializing {
134 durable_token: None,
135 current_deploy_generation,
136 }
137 }
138
139 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141 match self {
142 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144 FenceableToken::Fenced {
145 current_token,
146 fence_token,
147 } => {
148 assert!(
149 fence_token > current_token,
150 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151 );
152 if fence_token.deploy_generation > current_token.deploy_generation {
153 Err(FenceError::DeployGeneration {
154 current_generation: current_token.deploy_generation,
155 fence_generation: fence_token.deploy_generation,
156 })
157 } else {
158 assert!(
159 fence_token.epoch > current_token.epoch,
160 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161 );
162 Err(FenceError::Epoch {
163 current_epoch: current_token.epoch,
164 fence_epoch: fence_token.epoch,
165 })
166 }
167 }
168 }
169 }
170
171 fn token(&self) -> Option<FenceToken> {
173 match self {
174 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177 }
178 }
179
180 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182 match self {
183 FenceableToken::Initializing {
184 durable_token,
185 current_deploy_generation,
186 ..
187 } => {
188 match durable_token {
189 Some(durable_token) => {
190 *durable_token = max(durable_token.clone(), token.clone());
191 }
192 None => {
193 *durable_token = Some(token.clone());
194 }
195 }
196 if let Some(current_deploy_generation) = current_deploy_generation {
197 if *current_deploy_generation < token.deploy_generation {
198 *self = FenceableToken::Fenced {
199 current_token: FenceToken {
200 deploy_generation: *current_deploy_generation,
201 epoch: token.epoch,
202 },
203 fence_token: token,
204 };
205 self.validate()?;
206 }
207 }
208 }
209 FenceableToken::Unfenced { current_token } => {
210 if *current_token < token {
211 *self = FenceableToken::Fenced {
212 current_token: current_token.clone(),
213 fence_token: token,
214 };
215 self.validate()?;
216 }
217 }
218 FenceableToken::Fenced { .. } => {
219 self.validate()?;
220 }
221 }
222
223 Ok(())
224 }
225
226 fn generate_unfenced_token(
230 &self,
231 mode: Mode,
232 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233 let (durable_token, current_deploy_generation) = match self {
234 FenceableToken::Initializing {
235 durable_token,
236 current_deploy_generation,
237 } => (durable_token.clone(), current_deploy_generation.clone()),
238 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239 };
240
241 let mut fence_updates = Vec::with_capacity(2);
242
243 if let Some(durable_token) = &durable_token {
244 fence_updates.push((
245 StateUpdateKind::FenceToken(durable_token.clone()),
246 Diff::MINUS_ONE,
247 ));
248 }
249
250 let current_deploy_generation = current_deploy_generation
251 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252 .ok_or(DurableCatalogError::Uninitialized)?;
254 let mut current_epoch = durable_token
255 .map(|token| token.epoch)
256 .unwrap_or(MIN_EPOCH)
257 .get();
258 if matches!(mode, Mode::Writable) {
260 current_epoch = current_epoch + 1;
261 }
262 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263 let current_token = FenceToken {
264 deploy_generation: current_deploy_generation,
265 epoch: current_epoch,
266 };
267
268 fence_updates.push((
269 StateUpdateKind::FenceToken(current_token.clone()),
270 Diff::ONE,
271 ));
272
273 let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275 Ok(Some((fence_updates, current_fenceable_token)))
276 }
277}
278
279#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282 #[error(transparent)]
283 Fence(#[from] FenceError),
284 #[error(
287 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288 )]
289 UpperMismatch {
290 expected_upper: Timestamp,
291 actual_upper: Timestamp,
292 },
293}
294
295impl CompareAndAppendError {
296 pub(crate) fn unwrap_fence_error(self) -> FenceError {
297 match self {
298 CompareAndAppendError::Fence(e) => e,
299 e @ CompareAndAppendError::UpperMismatch { .. } => {
300 panic!("unexpected upper mismatch: {e:?}")
301 }
302 }
303 }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308 Self::UpperMismatch {
309 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310 actual_upper: antichain_to_timestamp(upper_mismatch.current),
311 }
312 }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316 fn apply_update(
320 &mut self,
321 update: StateUpdate<T>,
322 current_fence_token: &mut FenceableToken,
323 metrics: &Arc<Metrics>,
324 ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342 pub(crate) mode: Mode,
344 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350 persist_client: PersistClient,
352 shard_id: ShardId,
354 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358 update_applier: U,
360 pub(crate) upper: Timestamp,
362 fenceable_token: FenceableToken,
364 catalog_content_version: semver::Version,
366 bootstrap_complete: bool,
368 metrics: Arc<Metrics>,
370}
371
372impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
373 #[mz_ore::instrument]
375 async fn current_upper(&mut self) -> Timestamp {
376 match self.mode {
377 Mode::Writable | Mode::Readonly => {
378 let upper = self.write_handle.fetch_recent_upper().await;
379 antichain_to_timestamp(upper.clone())
380 }
381 Mode::Savepoint => self.upper,
382 }
383 }
384
385 #[mz_ore::instrument]
389 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
390 &mut self,
391 updates: Vec<(S, Diff)>,
392 commit_ts: Timestamp,
393 ) -> Result<Timestamp, CompareAndAppendError> {
394 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
396 let parsed_updates: Vec<_> = updates
397 .clone()
398 .into_iter()
399 .filter_map(|(update, diff)| {
400 let update: StateUpdateKindJson = update.into();
401 let update = TryIntoStateUpdateKind::try_into(update).ok()?;
402 Some((update, diff))
403 })
404 .collect();
405 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
406 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
407 });
408 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
409 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
410 });
411 Some(contains_retraction && contains_addition)
412 } else {
413 None
414 };
415
416 let updates = updates.into_iter().map(|(kind, diff)| {
417 let kind: StateUpdateKindJson = kind.into();
418 (
419 (Into::<SourceData>::into(kind), ()),
420 commit_ts,
421 diff.into_inner(),
422 )
423 });
424 let next_upper = commit_ts.step_forward();
425 self.compare_and_append_inner(updates, next_upper)
426 .await
427 .inspect_err(|e| {
428 if let Some(contains_fence) = contains_fence {
433 soft_assert_or_log!(
434 matches!(e, CompareAndAppendError::Fence(_)) || contains_fence,
435 "encountered an upper mismatch on a non-fencing write"
436 );
437 }
438 })?;
439
440 self.sync(next_upper).await?;
441 Ok(next_upper)
442 }
443
444 async fn compare_and_append_inner(
454 &mut self,
455 updates: impl IntoIterator<Item = ((SourceData, ()), Timestamp, StorageDiff)>,
456 next_upper: Timestamp,
457 ) -> Result<(), CompareAndAppendError> {
458 assert_eq!(self.mode, Mode::Writable);
459 assert!(
460 next_upper > self.upper,
461 "next_upper ({next_upper}) not greater than current upper ({})",
462 self.upper,
463 );
464
465 let res = self
466 .write_handle
467 .compare_and_append(
468 updates,
469 Antichain::from_elem(self.upper),
470 Antichain::from_elem(next_upper),
471 )
472 .await
473 .expect("invalid usage");
474
475 if let Err(e @ UpperMismatch { .. }) = res {
476 self.sync_to_current_upper().await?;
479 return Err(e.into());
480 }
481
482 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
484
485 let opaque = self.since_handle.opaque().clone();
490 let downgrade = self
491 .since_handle
492 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
493 .await;
494 if let Some(Err(e)) = downgrade {
495 soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}");
496 }
497
498 Ok(())
499 }
500
501 #[mz_ore::instrument]
504 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
505 let current_upper = self.current_upper().await;
506
507 let mut snapshot = Vec::new();
508 let mut read_handle = self.read_handle().await;
509 let as_of = as_of(&read_handle, current_upper);
510 let mut stream = Box::pin(
511 read_handle
513 .snapshot_and_stream(Antichain::from_elem(as_of))
514 .await
515 .expect("we have advanced the restart_as_of by the since"),
516 );
517 while let Some(update) = stream.next().await {
518 snapshot.push(update)
519 }
520 read_handle.expire().await;
521 snapshot
522 .into_iter()
523 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
524 .map(|state_update| state_update.try_into().expect("kind decoding error"))
525 .collect()
526 }
527
528 #[mz_ore::instrument]
532 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
533 let upper = self.current_upper().await;
534 self.sync(upper).await
535 }
536
537 #[mz_ore::instrument(level = "debug")]
541 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
542 self.metrics.syncs.inc();
543 let counter = self.metrics.sync_latency_seconds.clone();
544 self.sync_inner(target_upper)
545 .wall_time()
546 .inc_by(counter)
547 .await
548 }
549
550 #[mz_ore::instrument(level = "debug")]
551 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
552 self.fenceable_token.validate()?;
553
554 if self.mode == Mode::Savepoint {
557 self.upper = max(self.upper, target_upper);
558 return Ok(());
559 }
560
561 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
562
563 while self.upper < target_upper {
564 let listen_events = self.listen.fetch_next().await;
565 for listen_event in listen_events {
566 match listen_event {
567 ListenEvent::Progress(upper) => {
568 debug!("synced up to {upper:?}");
569 self.upper = antichain_to_timestamp(upper);
570 while let Some((ts, updates)) = updates.pop_first() {
575 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
576 let updates = updates.into_iter().map(
577 |update: StateUpdate<StateUpdateKindJson>| {
578 let kind =
579 T::try_from(update.kind).expect("kind decoding error");
580 StateUpdate {
581 kind,
582 ts: update.ts,
583 diff: update.diff,
584 }
585 },
586 );
587 self.apply_updates(updates)?;
588 }
589 }
590 ListenEvent::Updates(batch_updates) => {
591 for update in batch_updates {
592 let update: StateUpdate<StateUpdateKindJson> = update.into();
593 updates.entry(update.ts).or_default().push(update);
594 }
595 }
596 }
597 }
598 }
599 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
600 Ok(())
601 }
602
603 #[mz_ore::instrument(level = "debug")]
604 pub(crate) fn apply_updates(
605 &mut self,
606 updates: impl IntoIterator<Item = StateUpdate<T>>,
607 ) -> Result<(), FenceError> {
608 let mut updates: Vec<_> = updates
609 .into_iter()
610 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
611 .collect();
612
613 differential_dataflow::consolidation::consolidate_updates(&mut updates);
617
618 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
621
622 let mut errors = Vec::new();
623
624 for (kind, ts, diff) in updates {
625 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
626 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
627 }
628
629 match self.update_applier.apply_update(
630 StateUpdate { kind, ts, diff },
631 &mut self.fenceable_token,
632 &self.metrics,
633 ) {
634 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
635 Ok(None) => {}
636 Err(err) => errors.push(err),
639 }
640 }
641
642 errors.sort();
643 if let Some(err) = errors.into_iter().next() {
644 return Err(err);
645 }
646
647 self.consolidate();
648
649 Ok(())
650 }
651
652 #[mz_ore::instrument]
653 pub(crate) fn consolidate(&mut self) {
654 soft_assert_no_log!(
655 self.snapshot
656 .windows(2)
657 .all(|updates| updates[0].1 <= updates[1].1),
658 "snapshot should be sorted by timestamp, {:#?}",
659 self.snapshot
660 );
661
662 let new_ts = self
663 .snapshot
664 .last()
665 .map(|(_, ts, _)| *ts)
666 .unwrap_or_else(Timestamp::minimum);
667 for (_, ts, _) in &mut self.snapshot {
668 *ts = new_ts;
669 }
670 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
671 }
672
673 async fn with_trace<R>(
677 &mut self,
678 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
679 ) -> Result<R, CatalogError> {
680 self.sync_to_current_upper().await?;
681 f(&self.snapshot)
682 }
683
684 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
686 self.persist_client
687 .open_leased_reader(
688 self.shard_id,
689 Arc::new(persist_desc()),
690 Arc::new(UnitSchema::default()),
691 Diagnostics {
692 shard_name: CATALOG_SHARD_NAME.to_string(),
693 handle_purpose: "openable durable catalog state temporary reader".to_string(),
694 },
695 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
696 )
697 .await
698 .expect("invalid usage")
699 }
700
701 async fn expire(self: Box<Self>) {
703 self.write_handle.expire().await;
704 self.listen.expire().await;
705 }
706}
707
708impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
709 async fn with_snapshot<T>(
713 &mut self,
714 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
715 ) -> Result<T, CatalogError> {
716 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
717 where
718 K: Ord + Clone,
719 V: Ord + Clone + Debug,
720 {
721 let key = key.clone();
722 let value = value.clone();
723 if diff == Diff::ONE {
724 let prev = map.insert(key, value);
725 assert_eq!(
726 prev, None,
727 "values must be explicitly retracted before inserting a new value"
728 );
729 } else if diff == Diff::MINUS_ONE {
730 let prev = map.remove(&key);
731 assert_eq!(
732 prev,
733 Some(value),
734 "retraction does not match existing value"
735 );
736 }
737 }
738
739 self.with_trace(|trace| {
740 let mut snapshot = Snapshot::empty();
741 for (kind, ts, diff) in trace {
742 let diff = *diff;
743 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
744 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
745 }
746
747 match kind {
748 StateUpdateKind::AuditLog(_key, ()) => {
749 }
751 StateUpdateKind::Cluster(key, value) => {
752 apply(&mut snapshot.clusters, key, value, diff);
753 }
754 StateUpdateKind::ClusterReplica(key, value) => {
755 apply(&mut snapshot.cluster_replicas, key, value, diff);
756 }
757 StateUpdateKind::Comment(key, value) => {
758 apply(&mut snapshot.comments, key, value, diff);
759 }
760 StateUpdateKind::Config(key, value) => {
761 apply(&mut snapshot.configs, key, value, diff);
762 }
763 StateUpdateKind::Database(key, value) => {
764 apply(&mut snapshot.databases, key, value, diff);
765 }
766 StateUpdateKind::DefaultPrivilege(key, value) => {
767 apply(&mut snapshot.default_privileges, key, value, diff);
768 }
769 StateUpdateKind::FenceToken(_token) => {
770 }
772 StateUpdateKind::IdAllocator(key, value) => {
773 apply(&mut snapshot.id_allocator, key, value, diff);
774 }
775 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
776 apply(&mut snapshot.introspection_sources, key, value, diff);
777 }
778 StateUpdateKind::Item(key, value) => {
779 apply(&mut snapshot.items, key, value, diff);
780 }
781 StateUpdateKind::NetworkPolicy(key, value) => {
782 apply(&mut snapshot.network_policies, key, value, diff);
783 }
784 StateUpdateKind::Role(key, value) => {
785 apply(&mut snapshot.roles, key, value, diff);
786 }
787 StateUpdateKind::Schema(key, value) => {
788 apply(&mut snapshot.schemas, key, value, diff);
789 }
790 StateUpdateKind::Setting(key, value) => {
791 apply(&mut snapshot.settings, key, value, diff);
792 }
793 StateUpdateKind::SourceReferences(key, value) => {
794 apply(&mut snapshot.source_references, key, value, diff);
795 }
796 StateUpdateKind::SystemConfiguration(key, value) => {
797 apply(&mut snapshot.system_configurations, key, value, diff);
798 }
799 StateUpdateKind::SystemObjectMapping(key, value) => {
800 apply(&mut snapshot.system_object_mappings, key, value, diff);
801 }
802 StateUpdateKind::SystemPrivilege(key, value) => {
803 apply(&mut snapshot.system_privileges, key, value, diff);
804 }
805 StateUpdateKind::StorageCollectionMetadata(key, value) => {
806 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
807 }
808 StateUpdateKind::UnfinalizedShard(key, ()) => {
809 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
810 }
811 StateUpdateKind::TxnWalShard((), value) => {
812 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
813 }
814 StateUpdateKind::RoleAuth(key, value) => {
815 apply(&mut snapshot.role_auth, key, value, diff);
816 }
817 }
818 }
819 f(snapshot)
820 })
821 .await
822 }
823
824 #[mz_ore::instrument(level = "debug")]
831 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
832 let mut read_handle = self.read_handle().await;
833 let as_of = as_of(&read_handle, self.upper);
834 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
835 .await
836 .map(|update| update.try_into().expect("kind decoding error"));
837 read_handle.expire().await;
838 snapshot
839 }
840}
841
842#[derive(Debug)]
844pub(crate) struct UnopenedCatalogStateInner {
845 configs: BTreeMap<String, u64>,
847 settings: BTreeMap<String, String>,
849}
850
851impl UnopenedCatalogStateInner {
852 fn new() -> UnopenedCatalogStateInner {
853 UnopenedCatalogStateInner {
854 configs: BTreeMap::new(),
855 settings: BTreeMap::new(),
856 }
857 }
858}
859
860impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
861 fn apply_update(
862 &mut self,
863 update: StateUpdate<StateUpdateKindJson>,
864 current_fence_token: &mut FenceableToken,
865 _metrics: &Arc<Metrics>,
866 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
867 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
868 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
869 match (kind, update.diff) {
870 (StateUpdateKind::Config(key, value), Diff::ONE) => {
871 let prev = self.configs.insert(key.key, value.value);
872 assert_eq!(
873 prev, None,
874 "values must be explicitly retracted before inserting a new value"
875 );
876 }
877 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
878 let prev = self.configs.remove(&key.key);
879 assert_eq!(
880 prev,
881 Some(value.value),
882 "retraction does not match existing value"
883 );
884 }
885 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
886 let prev = self.settings.insert(key.name, value.value);
887 assert_eq!(
888 prev, None,
889 "values must be explicitly retracted before inserting a new value"
890 );
891 }
892 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
893 let prev = self.settings.remove(&key.name);
894 assert_eq!(
895 prev,
896 Some(value.value),
897 "retraction does not match existing value"
898 );
899 }
900 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
901 current_fence_token.maybe_fence(fence_token)?;
902 }
903 _ => {}
904 }
905 }
906
907 Ok(Some(update))
908 }
909}
910
911pub(crate) type UnopenedPersistCatalogState =
919 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
920
921impl UnopenedPersistCatalogState {
922 #[mz_ore::instrument]
928 pub(crate) async fn new(
929 persist_client: PersistClient,
930 organization_id: Uuid,
931 version: semver::Version,
932 deploy_generation: Option<u64>,
933 metrics: Arc<Metrics>,
934 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
935 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
936 debug!(?catalog_shard_id, "new persist backed catalog state");
937
938 let version_in_catalog_shard =
942 fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
943 if let Some(version_in_catalog_shard) = version_in_catalog_shard {
944 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
945 return Err(DurableCatalogError::IncompatiblePersistVersion {
946 found_version: version_in_catalog_shard,
947 catalog_version: version,
948 });
949 }
950 }
951
952 let open_handles_start = Instant::now();
953 info!("startup: envd serve: catalog init: open handles beginning");
954 let since_handle = persist_client
955 .open_critical_since(
956 catalog_shard_id,
957 CATALOG_CRITICAL_SINCE.clone(),
958 Opaque::encode(&i64::MIN),
959 Diagnostics {
960 shard_name: CATALOG_SHARD_NAME.to_string(),
961 handle_purpose: "durable catalog state critical since".to_string(),
962 },
963 )
964 .await
965 .expect("invalid usage");
966
967 let (mut write_handle, mut read_handle) = persist_client
968 .open(
969 catalog_shard_id,
970 Arc::new(persist_desc()),
971 Arc::new(UnitSchema::default()),
972 Diagnostics {
973 shard_name: CATALOG_SHARD_NAME.to_string(),
974 handle_purpose: "durable catalog state handles".to_string(),
975 },
976 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
977 )
978 .await
979 .expect("invalid usage");
980 info!(
981 "startup: envd serve: catalog init: open handles complete in {:?}",
982 open_handles_start.elapsed()
983 );
984
985 let upper = {
987 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
988 let upper = Antichain::from_elem(Timestamp::minimum());
989 let next_upper = Timestamp::minimum().step_forward();
990 match write_handle
991 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
992 .await
993 .expect("invalid usage")
994 {
995 Ok(()) => next_upper,
996 Err(mismatch) => antichain_to_timestamp(mismatch.current),
997 }
998 };
999
1000 let snapshot_start = Instant::now();
1001 info!("startup: envd serve: catalog init: snapshot beginning");
1002 let as_of = as_of(&read_handle, upper);
1003 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1004 .await
1005 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1006 .collect();
1007 let listen = read_handle
1008 .listen(Antichain::from_elem(as_of))
1009 .await
1010 .expect("invalid usage");
1011 info!(
1012 "startup: envd serve: catalog init: snapshot complete in {:?}",
1013 snapshot_start.elapsed()
1014 );
1015
1016 let mut handle = UnopenedPersistCatalogState {
1017 mode: Mode::Writable,
1019 since_handle,
1020 write_handle,
1021 listen,
1022 persist_client,
1023 shard_id: catalog_shard_id,
1024 snapshot: Vec::new(),
1026 update_applier: UnopenedCatalogStateInner::new(),
1027 upper,
1028 fenceable_token: FenceableToken::new(deploy_generation),
1029 catalog_content_version: version,
1030 bootstrap_complete: false,
1031 metrics,
1032 };
1033 soft_assert_no_log!(
1036 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1037 "snapshot should be consolidated: {snapshot:#?}"
1038 );
1039
1040 let apply_start = Instant::now();
1041 info!("startup: envd serve: catalog init: apply updates beginning");
1042 let updates = snapshot
1043 .into_iter()
1044 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1045 handle.apply_updates(updates)?;
1046 info!(
1047 "startup: envd serve: catalog init: apply updates complete in {:?}",
1048 apply_start.elapsed()
1049 );
1050
1051 if let Some(found_version) = handle.get_catalog_content_version().await? {
1057 if handle
1059 .catalog_content_version
1060 .cmp_precedence(&found_version)
1061 == std::cmp::Ordering::Less
1062 {
1063 return Err(DurableCatalogError::IncompatiblePersistVersion {
1064 found_version,
1065 catalog_version: handle.catalog_content_version,
1066 });
1067 }
1068 }
1069
1070 Ok(handle)
1071 }
1072
1073 #[mz_ore::instrument]
1074 async fn open_inner(
1075 mut self,
1076 mode: Mode,
1077 initial_ts: Timestamp,
1078 bootstrap_args: &BootstrapArgs,
1079 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1080 let mut commit_ts = self.upper;
1083 self.mode = mode;
1084
1085 match (&self.mode, &self.fenceable_token) {
1087 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1088 return Err(DurableCatalogError::Internal(
1089 "catalog should not have fenced before opening".to_string(),
1090 )
1091 .into());
1092 }
1093 (
1094 Mode::Writable | Mode::Savepoint,
1095 FenceableToken::Initializing {
1096 current_deploy_generation: None,
1097 ..
1098 },
1099 ) => {
1100 return Err(DurableCatalogError::Internal(format!(
1101 "cannot open in mode '{:?}' without a deploy generation",
1102 self.mode,
1103 ))
1104 .into());
1105 }
1106 _ => {}
1107 }
1108
1109 let read_only = matches!(self.mode, Mode::Readonly);
1110
1111 loop {
1113 self.sync_to_current_upper().await?;
1114 commit_ts = max(commit_ts, self.upper);
1115 let (fence_updates, current_fenceable_token) = self
1116 .fenceable_token
1117 .generate_unfenced_token(self.mode)?
1118 .ok_or_else(|| {
1119 DurableCatalogError::Internal(
1120 "catalog should not have fenced before opening".to_string(),
1121 )
1122 })?;
1123 debug!(
1124 ?self.upper,
1125 ?self.fenceable_token,
1126 ?current_fenceable_token,
1127 "fencing previous catalogs"
1128 );
1129 if matches!(self.mode, Mode::Writable) {
1130 match self
1131 .compare_and_append(fence_updates.clone(), commit_ts)
1132 .await
1133 {
1134 Ok(upper) => {
1135 commit_ts = upper;
1136 }
1137 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1138 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1139 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1140 continue;
1141 }
1142 }
1143 }
1144 self.fenceable_token = current_fenceable_token;
1145 break;
1146 }
1147
1148 if matches!(self.mode, Mode::Writable) {
1149 let mut controller_handle = self
1156 .persist_client
1157 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1158 self.shard_id,
1159 PersistClient::CONTROLLER_CRITICAL_SINCE,
1160 Opaque::encode(&i64::MIN),
1161 Diagnostics {
1162 shard_name: CATALOG_SHARD_NAME.to_string(),
1163 handle_purpose: "durable catalog state critical since (migration)"
1164 .to_string(),
1165 },
1166 )
1167 .await
1168 .expect("invalid usage");
1169
1170 let since = controller_handle.since().clone();
1171 let res = controller_handle
1172 .compare_and_downgrade_since(
1173 &Opaque::encode(&i64::MIN),
1174 (&Opaque::encode(&PersistEpoch::default()), &since),
1175 )
1176 .await;
1177 match res {
1178 Ok(_) => info!("migrated Opaque of catalog since handle"),
1179 Err(_) => { }
1180 }
1181 }
1182
1183 let is_initialized = self.is_initialized_inner();
1184 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1185 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1186 format!(
1187 "catalog tables do not exist; will not create in {:?} mode",
1188 self.mode
1189 ),
1190 )));
1191 }
1192 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1193
1194 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1196 .snapshot
1197 .into_iter()
1198 .partition(|(update, _, _)| update.is_audit_log());
1199 self.snapshot = snapshot;
1200 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1201 let audit_log_handle = AuditLogIterator::new(audit_logs);
1202
1203 if is_initialized && !read_only {
1205 commit_ts = upgrade(&mut self, commit_ts).await?;
1206 }
1207
1208 debug!(
1209 ?is_initialized,
1210 ?self.upper,
1211 "initializing catalog state"
1212 );
1213 let mut catalog = PersistCatalogState {
1214 mode: self.mode,
1215 since_handle: self.since_handle,
1216 write_handle: self.write_handle,
1217 listen: self.listen,
1218 persist_client: self.persist_client,
1219 shard_id: self.shard_id,
1220 upper: self.upper,
1221 fenceable_token: self.fenceable_token,
1222 snapshot: Vec::new(),
1224 update_applier: CatalogStateInner::new(),
1225 catalog_content_version: self.catalog_content_version,
1226 bootstrap_complete: false,
1227 metrics: self.metrics,
1228 };
1229 catalog.metrics.collection_entries.reset();
1230 catalog
1233 .metrics
1234 .collection_entries
1235 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1236 .add(audit_log_count.into_inner());
1237 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1238 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1239 StateUpdate { kind, ts, diff }
1240 });
1241 catalog.apply_updates(updates)?;
1242
1243 let catalog_content_version = catalog.catalog_content_version.to_string();
1244 let txn = if is_initialized {
1245 let mut txn = catalog.transaction().await?;
1246
1247 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1254 let old_version = txn.get_catalog_content_version();
1255 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1256 }
1257
1258 txn.set_catalog_content_version(catalog_content_version)?;
1259 txn
1260 } else {
1261 soft_assert_eq_no_log!(
1262 catalog
1263 .snapshot
1264 .iter()
1265 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1266 .count(),
1267 0,
1268 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1269 catalog.snapshot
1270 );
1271
1272 let mut txn = catalog.transaction().await?;
1273 initialize::initialize(
1274 &mut txn,
1275 bootstrap_args,
1276 initial_ts.into(),
1277 catalog_content_version,
1278 )
1279 .await?;
1280 txn
1281 };
1282
1283 if read_only {
1284 let (txn_batch, _) = txn.into_parts();
1285 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1287 catalog.apply_updates(updates)?;
1288 } else {
1289 txn.commit_internal(commit_ts).await?;
1290 }
1291
1292 if matches!(catalog.mode, Mode::Writable) {
1293 let write_handle = catalog
1294 .persist_client
1295 .open_writer::<SourceData, (), Timestamp, i64>(
1296 catalog.write_handle.shard_id(),
1297 Arc::new(persist_desc()),
1298 Arc::new(UnitSchema::default()),
1299 Diagnostics {
1300 shard_name: CATALOG_SHARD_NAME.to_string(),
1301 handle_purpose: "compact catalog".to_string(),
1302 },
1303 )
1304 .await
1305 .expect("invalid usage");
1306 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1307 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1308 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1311 let () =
1312 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1313 &write_handle,
1314 || fuel.get(),
1315 || wait.get(),
1316 )
1317 .await;
1318 });
1319 }
1320
1321 Ok((Box::new(catalog), audit_log_handle))
1322 }
1323
1324 #[mz_ore::instrument]
1329 fn is_initialized_inner(&self) -> bool {
1330 !self.update_applier.configs.is_empty()
1331 }
1332
1333 #[mz_ore::instrument]
1337 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1338 self.sync_to_current_upper().await?;
1339 Ok(self.update_applier.configs.get(key).cloned())
1340 }
1341
1342 #[mz_ore::instrument]
1346 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1347 self.get_current_config(USER_VERSION_KEY).await
1348 }
1349
1350 #[mz_ore::instrument]
1354 async fn get_current_setting(
1355 &mut self,
1356 name: &str,
1357 ) -> Result<Option<String>, DurableCatalogError> {
1358 self.sync_to_current_upper().await?;
1359 Ok(self.update_applier.settings.get(name).cloned())
1360 }
1361
1362 #[mz_ore::instrument]
1367 async fn get_catalog_content_version(
1368 &mut self,
1369 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1370 let version = self
1371 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1372 .await?;
1373 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1374 Ok(version)
1375 }
1376}
1377
1378#[async_trait]
1379impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1380 #[mz_ore::instrument]
1381 async fn open_savepoint(
1382 mut self: Box<Self>,
1383 initial_ts: Timestamp,
1384 bootstrap_args: &BootstrapArgs,
1385 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1386 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1387 .boxed()
1388 .await
1389 }
1390
1391 #[mz_ore::instrument]
1392 async fn open_read_only(
1393 mut self: Box<Self>,
1394 bootstrap_args: &BootstrapArgs,
1395 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1396 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1397 .boxed()
1398 .await
1399 .map(|(catalog, _)| catalog)
1400 }
1401
1402 #[mz_ore::instrument]
1403 async fn open(
1404 mut self: Box<Self>,
1405 initial_ts: Timestamp,
1406 bootstrap_args: &BootstrapArgs,
1407 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1408 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1409 .boxed()
1410 .await
1411 }
1412
1413 #[mz_ore::instrument(level = "debug")]
1414 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1415 Ok(DebugCatalogState(*self))
1416 }
1417
1418 #[mz_ore::instrument]
1419 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1420 self.sync_to_current_upper().await?;
1421 Ok(self.is_initialized_inner())
1422 }
1423
1424 #[mz_ore::instrument]
1425 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1426 self.sync_to_current_upper().await?;
1427 self.fenceable_token
1428 .validate()?
1429 .map(|token| token.epoch)
1430 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1431 }
1432
1433 #[mz_ore::instrument]
1434 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1435 self.sync_to_current_upper().await?;
1436 self.fenceable_token
1437 .token()
1438 .map(|token| token.deploy_generation)
1439 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1440 }
1441
1442 #[mz_ore::instrument(level = "debug")]
1443 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1444 let value = self
1445 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1446 .await?;
1447 match value {
1448 None => Ok(None),
1449 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1450 }
1451 }
1452
1453 #[mz_ore::instrument(level = "debug")]
1454 async fn get_0dt_deployment_ddl_check_interval(
1455 &mut self,
1456 ) -> Result<Option<Duration>, CatalogError> {
1457 let value = self
1458 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1459 .await?;
1460 match value {
1461 None => Ok(None),
1462 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1463 }
1464 }
1465
1466 #[mz_ore::instrument(level = "debug")]
1467 async fn get_enable_0dt_deployment_panic_after_timeout(
1468 &mut self,
1469 ) -> Result<Option<bool>, CatalogError> {
1470 let value = self
1471 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1472 .await?;
1473 match value {
1474 None => Ok(None),
1475 Some(0) => Ok(Some(false)),
1476 Some(1) => Ok(Some(true)),
1477 Some(v) => Err(
1478 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1479 "{v} is not a valid boolean value"
1480 )))
1481 .into(),
1482 ),
1483 }
1484 }
1485
1486 #[mz_ore::instrument]
1487 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1488 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1489 .await
1490 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1491 }
1492
1493 #[mz_ore::instrument]
1494 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1495 self.sync_to_current_upper().await?;
1496 if self.is_initialized_inner() {
1497 let snapshot = self.snapshot_unconsolidated().await;
1498 Ok(Trace::from_snapshot(snapshot))
1499 } else {
1500 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1501 }
1502 }
1503
1504 #[mz_ore::instrument]
1505 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1506 self.sync_to_current_upper().await?;
1507 if self.is_initialized_inner() {
1508 let snapshot = self.current_snapshot().await?;
1509 Ok(Trace::from_snapshot(snapshot))
1510 } else {
1511 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1512 }
1513 }
1514
1515 #[mz_ore::instrument(level = "debug")]
1516 async fn expire(self: Box<Self>) {
1517 self.expire().await
1518 }
1519}
1520
1521#[derive(Debug)]
1523struct CatalogStateInner {
1524 updates: VecDeque<memory::objects::StateUpdate>,
1526}
1527
1528impl CatalogStateInner {
1529 fn new() -> CatalogStateInner {
1530 CatalogStateInner {
1531 updates: VecDeque::new(),
1532 }
1533 }
1534}
1535
1536impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1537 fn apply_update(
1538 &mut self,
1539 update: StateUpdate<StateUpdateKind>,
1540 current_fence_token: &mut FenceableToken,
1541 metrics: &Arc<Metrics>,
1542 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1543 if let Some(collection_type) = update.kind.collection_type() {
1544 metrics
1545 .collection_entries
1546 .with_label_values(&[&collection_type.to_string()])
1547 .add(update.diff.into_inner());
1548 }
1549
1550 {
1551 let update: Option<memory::objects::StateUpdate> = (&update)
1552 .try_into()
1553 .expect("invalid persisted update: {update:#?}");
1554 if let Some(update) = update {
1555 self.updates.push_back(update);
1556 }
1557 }
1558
1559 match (update.kind, update.diff) {
1560 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1561 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1563 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1564 current_fence_token.maybe_fence(token)?;
1565 Ok(None)
1566 }
1567 (kind, diff) => Ok(Some(StateUpdate {
1568 kind,
1569 ts: update.ts,
1570 diff,
1571 })),
1572 }
1573 }
1574}
1575
1576type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1582
1583#[async_trait]
1584impl ReadOnlyDurableCatalogState for PersistCatalogState {
1585 fn epoch(&self) -> Epoch {
1586 self.fenceable_token
1587 .token()
1588 .expect("opened catalog state must have an epoch")
1589 .epoch
1590 }
1591
1592 fn metrics(&self) -> &Metrics {
1593 &self.metrics
1594 }
1595
1596 #[mz_ore::instrument(level = "debug")]
1597 async fn expire(self: Box<Self>) {
1598 self.expire().await
1599 }
1600
1601 fn is_bootstrap_complete(&self) -> bool {
1602 self.bootstrap_complete
1603 }
1604
1605 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1606 self.sync_to_current_upper().await?;
1607 let audit_logs: Vec<_> = self
1608 .persist_snapshot()
1609 .await
1610 .filter_map(
1611 |StateUpdate {
1612 kind,
1613 ts: _,
1614 diff: _,
1615 }| match kind {
1616 StateUpdateKind::AuditLog(key, ()) => Some(key),
1617 _ => None,
1618 },
1619 )
1620 .collect();
1621 let mut audit_logs: Vec<_> = audit_logs
1622 .into_iter()
1623 .map(RustType::from_proto)
1624 .map_ok(|key: AuditLogKey| key.event)
1625 .collect::<Result<_, _>>()?;
1626 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1627 Ok(audit_logs)
1628 }
1629
1630 #[mz_ore::instrument(level = "debug")]
1631 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1632 self.with_trace(|trace| {
1633 Ok(trace
1634 .into_iter()
1635 .rev()
1636 .filter_map(|(kind, _, _)| match kind {
1637 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1638 Some(value.next_id)
1639 }
1640 _ => None,
1641 })
1642 .next()
1643 .expect("must exist"))
1644 })
1645 .await
1646 }
1647
1648 #[mz_ore::instrument(level = "debug")]
1649 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1650 self.sync_to_current_upper().await?;
1651 Ok(self
1652 .fenceable_token
1653 .token()
1654 .expect("opened catalogs must have a token")
1655 .deploy_generation)
1656 }
1657
1658 #[mz_ore::instrument(level = "debug")]
1659 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1660 self.with_snapshot(Ok).await
1661 }
1662
1663 #[mz_ore::instrument(level = "debug")]
1664 async fn sync_to_current_updates(
1665 &mut self,
1666 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1667 let upper = self.current_upper().await;
1668 self.sync_updates(upper).await
1669 }
1670
1671 #[mz_ore::instrument(level = "debug")]
1672 async fn sync_updates(
1673 &mut self,
1674 target_upper: mz_repr::Timestamp,
1675 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1676 self.sync(target_upper).await?;
1677 let mut updates = Vec::new();
1678 while let Some(update) = self.update_applier.updates.front() {
1679 if update.ts >= target_upper {
1680 break;
1681 }
1682
1683 let update = self
1684 .update_applier
1685 .updates
1686 .pop_front()
1687 .expect("peeked above");
1688 updates.push(update);
1689 }
1690 Ok(updates)
1691 }
1692
1693 async fn current_upper(&mut self) -> Timestamp {
1694 self.current_upper().await
1695 }
1696}
1697
1698#[async_trait]
1699#[allow(mismatched_lifetime_syntaxes)]
1700impl DurableCatalogState for PersistCatalogState {
1701 fn is_read_only(&self) -> bool {
1702 matches!(self.mode, Mode::Readonly)
1703 }
1704
1705 fn is_savepoint(&self) -> bool {
1706 matches!(self.mode, Mode::Savepoint)
1707 }
1708
1709 async fn mark_bootstrap_complete(&mut self) {
1710 self.bootstrap_complete = true;
1711 if matches!(self.mode, Mode::Writable) {
1712 self.since_handle
1713 .upgrade_version()
1714 .await
1715 .expect("invalid usage")
1716 }
1717 }
1718
1719 #[mz_ore::instrument(level = "debug")]
1720 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1721 self.metrics.transactions_started.inc();
1722 let snapshot = self.snapshot().await?;
1723 let commit_ts = self.upper.clone();
1724 Transaction::new(self, snapshot, commit_ts)
1725 }
1726
1727 fn transaction_from_snapshot(
1728 &mut self,
1729 snapshot: Snapshot,
1730 ) -> Result<Transaction, CatalogError> {
1731 let commit_ts = self.upper.clone();
1732 Transaction::new(self, snapshot, commit_ts)
1733 }
1734
1735 #[mz_ore::instrument(level = "debug")]
1736 async fn commit_transaction(
1737 &mut self,
1738 txn_batch: TransactionBatch,
1739 commit_ts: Timestamp,
1740 ) -> Result<Timestamp, CatalogError> {
1741 async fn commit_transaction_inner(
1742 catalog: &mut PersistCatalogState,
1743 txn_batch: TransactionBatch,
1744 commit_ts: Timestamp,
1745 ) -> Result<Timestamp, CatalogError> {
1746 if catalog.mode == Mode::Readonly {
1750 let updates: Vec<_> = StateUpdate::from_txn_batch(txn_batch).collect();
1751 if !updates.is_empty() {
1752 return Err(DurableCatalogError::NotWritable(format!(
1753 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1754 ))
1755 .into());
1756 }
1757 return Ok(catalog.upper);
1758 }
1759
1760 assert_eq!(
1765 catalog.upper, txn_batch.upper,
1766 "only one transaction at a time is supported"
1767 );
1768
1769 assert!(
1770 commit_ts >= catalog.upper,
1771 "expected commit ts, {}, to be greater than or equal to upper, {}",
1772 commit_ts,
1773 catalog.upper
1774 );
1775
1776 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1777 debug!("committing updates: {updates:?}");
1778
1779 let next_upper = match catalog.mode {
1780 Mode::Writable => catalog
1781 .compare_and_append(updates, commit_ts)
1782 .await
1783 .map_err(|e| e.unwrap_fence_error())?,
1784 Mode::Savepoint => {
1785 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1786 kind,
1787 ts: commit_ts,
1788 diff,
1789 });
1790 catalog.apply_updates(updates)?;
1791 catalog.upper = commit_ts.step_forward();
1792 catalog.upper
1793 }
1794 Mode::Readonly => unreachable!("handled above"),
1795 };
1796
1797 Ok(next_upper)
1798 }
1799 self.metrics.transaction_commits.inc();
1800 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1801 commit_transaction_inner(self, txn_batch, commit_ts)
1802 .wall_time()
1803 .inc_by(counter)
1804 .await
1805 }
1806
1807 #[mz_ore::instrument(level = "debug")]
1808 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError> {
1809 if self.upper >= new_upper {
1810 soft_panic_or_log!(
1818 "new_upper ({new_upper}) not greater than current upper ({})",
1819 self.upper
1820 );
1821 return Ok(());
1822 }
1823
1824 match self.mode {
1825 Mode::Writable => self
1826 .compare_and_append_inner([], new_upper)
1827 .await
1828 .map_err(|e| e.unwrap_fence_error())?,
1829 Mode::Savepoint => (),
1830 Mode::Readonly => {
1831 return Err(DurableCatalogError::NotWritable(
1832 "cannot advance upper of a read-only catalog".into(),
1833 )
1834 .into());
1835 }
1836 }
1837
1838 self.upper = new_upper;
1839 Ok(())
1842 }
1843
1844 fn shard_id(&self) -> ShardId {
1845 self.shard_id
1846 }
1847}
1848
1849pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1851 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1852 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1853 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1854 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1855}
1856
1857fn as_of(
1860 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1861 upper: Timestamp,
1862) -> Timestamp {
1863 let since = read_handle.since().clone();
1864 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1865 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1866 });
1867 soft_assert_or_log!(
1870 since.less_equal(&as_of),
1871 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1872 );
1873 as_of.advance_by(since.borrow());
1876 as_of
1877}
1878
1879async fn fetch_catalog_shard_version(
1882 persist_client: &PersistClient,
1883 catalog_shard_id: ShardId,
1884) -> Option<semver::Version> {
1885 let shard_state = persist_client
1886 .inspect_shard::<Timestamp>(&catalog_shard_id)
1887 .await
1888 .ok()?;
1889 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1890 let json_version = json_state
1891 .get("applier_version")
1892 .cloned()
1893 .expect("missing applier_version");
1894 let version = serde_json::from_value(json_version).expect("version deserialization error");
1895 Some(version)
1896}
1897
1898#[mz_ore::instrument(level = "debug")]
1903async fn snapshot_binary(
1904 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1905 as_of: Timestamp,
1906 metrics: &Arc<Metrics>,
1907) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1908 metrics.snapshots_taken.inc();
1909 let counter = metrics.snapshot_latency_seconds.clone();
1910 snapshot_binary_inner(read_handle, as_of)
1911 .wall_time()
1912 .inc_by(counter)
1913 .await
1914}
1915
1916#[mz_ore::instrument(level = "debug")]
1921async fn snapshot_binary_inner(
1922 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1923 as_of: Timestamp,
1924) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1925 let snapshot = read_handle
1926 .snapshot_and_fetch(Antichain::from_elem(as_of))
1927 .await
1928 .expect("we have advanced the restart_as_of by the since");
1929 soft_assert_no_log!(
1930 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1931 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1932 );
1933 snapshot
1934 .into_iter()
1935 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1936 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1937}
1938
1939pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1944 antichain
1945 .into_option()
1946 .expect("we use a totally ordered time and never finalize the shard")
1947}
1948
1949impl Trace {
1952 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1954 let mut trace = Trace::new();
1955 for StateUpdate { kind, ts, diff } in snapshot {
1956 match kind {
1957 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1958 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1959 StateUpdateKind::ClusterReplica(k, v) => {
1960 trace.cluster_replicas.values.push(((k, v), ts, diff))
1961 }
1962 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1963 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1964 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1965 StateUpdateKind::DefaultPrivilege(k, v) => {
1966 trace.default_privileges.values.push(((k, v), ts, diff))
1967 }
1968 StateUpdateKind::FenceToken(_) => {
1969 }
1971 StateUpdateKind::IdAllocator(k, v) => {
1972 trace.id_allocator.values.push(((k, v), ts, diff))
1973 }
1974 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1975 trace.introspection_sources.values.push(((k, v), ts, diff))
1976 }
1977 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1978 StateUpdateKind::NetworkPolicy(k, v) => {
1979 trace.network_policies.values.push(((k, v), ts, diff))
1980 }
1981 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1982 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1983 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1984 StateUpdateKind::SourceReferences(k, v) => {
1985 trace.source_references.values.push(((k, v), ts, diff))
1986 }
1987 StateUpdateKind::SystemConfiguration(k, v) => {
1988 trace.system_configurations.values.push(((k, v), ts, diff))
1989 }
1990 StateUpdateKind::SystemObjectMapping(k, v) => {
1991 trace.system_object_mappings.values.push(((k, v), ts, diff))
1992 }
1993 StateUpdateKind::SystemPrivilege(k, v) => {
1994 trace.system_privileges.values.push(((k, v), ts, diff))
1995 }
1996 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1997 .storage_collection_metadata
1998 .values
1999 .push(((k, v), ts, diff)),
2000 StateUpdateKind::UnfinalizedShard(k, ()) => {
2001 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
2002 }
2003 StateUpdateKind::TxnWalShard((), v) => {
2004 trace.txn_wal_shard.values.push((((), v), ts, diff))
2005 }
2006 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
2007 }
2008 }
2009 trace
2010 }
2011}
2012
2013impl UnopenedPersistCatalogState {
2014 #[mz_ore::instrument]
2016 pub(crate) async fn debug_edit<T: Collection>(
2017 &mut self,
2018 key: T::Key,
2019 value: T::Value,
2020 ) -> Result<Option<T::Value>, CatalogError>
2021 where
2022 T::Key: PartialEq + Eq + Debug + Clone,
2023 T::Value: Debug + Clone,
2024 {
2025 let prev_value = loop {
2026 let key = key.clone();
2027 let value = value.clone();
2028 let snapshot = self.current_snapshot().await?;
2029 let trace = Trace::from_snapshot(snapshot);
2030 let collection_trace = T::collection_trace(trace);
2031 let prev_values: Vec<_> = collection_trace
2032 .values
2033 .into_iter()
2034 .filter(|((k, _), _, diff)| {
2035 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2036 &key == k
2037 })
2038 .collect();
2039
2040 let prev_value = match &prev_values[..] {
2041 [] => None,
2042 [((_, v), _, _)] => Some(v.clone()),
2043 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2044 };
2045
2046 let mut updates: Vec<_> = prev_values
2047 .into_iter()
2048 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2049 .collect();
2050 updates.push((T::update(key, value), Diff::ONE));
2051 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2053 Some((fence_updates, current_fenceable_token)) => {
2054 updates.extend(fence_updates.clone());
2055 match self.compare_and_append(updates, self.upper).await {
2056 Ok(_) => {
2057 self.fenceable_token = current_fenceable_token;
2058 break prev_value;
2059 }
2060 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2061 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2062 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2063 continue;
2064 }
2065 }
2066 }
2067 None => {
2068 self.compare_and_append(updates, self.upper)
2069 .await
2070 .map_err(|e| e.unwrap_fence_error())?;
2071 break prev_value;
2072 }
2073 }
2074 };
2075 Ok(prev_value)
2076 }
2077
2078 #[mz_ore::instrument]
2080 pub(crate) async fn debug_delete<T: Collection>(
2081 &mut self,
2082 key: T::Key,
2083 ) -> Result<(), CatalogError>
2084 where
2085 T::Key: PartialEq + Eq + Debug + Clone,
2086 T::Value: Debug,
2087 {
2088 loop {
2089 let key = key.clone();
2090 let snapshot = self.current_snapshot().await?;
2091 let trace = Trace::from_snapshot(snapshot);
2092 let collection_trace = T::collection_trace(trace);
2093 let mut retractions: Vec<_> = collection_trace
2094 .values
2095 .into_iter()
2096 .filter(|((k, _), _, diff)| {
2097 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2098 &key == k
2099 })
2100 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2101 .collect();
2102
2103 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2105 Some((fence_updates, current_fenceable_token)) => {
2106 retractions.extend(fence_updates.clone());
2107 match self.compare_and_append(retractions, self.upper).await {
2108 Ok(_) => {
2109 self.fenceable_token = current_fenceable_token;
2110 break;
2111 }
2112 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2113 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2114 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2115 continue;
2116 }
2117 }
2118 }
2119 None => {
2120 self.compare_and_append(retractions, self.upper)
2121 .await
2122 .map_err(|e| e.unwrap_fence_error())?;
2123 break;
2124 }
2125 }
2126 }
2127 Ok(())
2128 }
2129
2130 async fn current_snapshot(
2135 &mut self,
2136 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2137 self.sync_to_current_upper().await?;
2138 self.consolidate();
2139 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2140 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2141 StateUpdate { kind, ts, diff }
2142 }))
2143 }
2144}