1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::{Arc, LazyLock};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::{CriticalReaderId, Opaque, SinceHandle};
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::Diff;
41use mz_storage_client::controller::PersistEpoch;
42use mz_storage_types::StorageDiff;
43use mz_storage_types::sources::SourceData;
44use sha2::Digest;
45use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
46use tracing::{debug, info, warn};
47use uuid::Uuid;
48
49use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
50use crate::durable::error::FenceError;
51use crate::durable::initialize::{
52 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
53 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
54};
55use crate::durable::metrics::Metrics;
56use crate::durable::objects::state_update::{
57 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
58 TryIntoStateUpdateKind,
59};
60use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
61use crate::durable::transaction::TransactionBatch;
62use crate::durable::upgrade::upgrade;
63use crate::durable::{
64 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
65 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
66 ReadOnlyDurableCatalogState, Transaction, initialize, persist_desc,
67};
68use crate::memory;
69
70pub(crate) type Timestamp = mz_repr::Timestamp;
72
73const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
78
79const CATALOG_SHARD_NAME: &str = "catalog";
81
82static CATALOG_CRITICAL_SINCE: LazyLock<CriticalReaderId> = LazyLock::new(|| {
85 "c55555555-6666-7777-8888-999999999999"
86 .parse()
87 .expect("valid CriticalReaderId")
88});
89
90const CATALOG_SEED: usize = 1;
92const _UPGRADE_SEED: usize = 2;
94pub const _BUILTIN_MIGRATION_SEED: usize = 3;
96pub const _EXPRESSION_CACHE_SEED: usize = 4;
98
99#[derive(Debug, Copy, Clone, Eq, PartialEq)]
101pub(crate) enum Mode {
102 Readonly,
104 Savepoint,
106 Writable,
108}
109
110#[derive(Debug)]
112pub(crate) enum FenceableToken {
113 Initializing {
116 durable_token: Option<FenceToken>,
118 current_deploy_generation: Option<u64>,
120 },
121 Unfenced { current_token: FenceToken },
123 Fenced {
125 current_token: FenceToken,
126 fence_token: FenceToken,
127 },
128}
129
130impl FenceableToken {
131 fn new(current_deploy_generation: Option<u64>) -> Self {
133 Self::Initializing {
134 durable_token: None,
135 current_deploy_generation,
136 }
137 }
138
139 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
141 match self {
142 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
143 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
144 FenceableToken::Fenced {
145 current_token,
146 fence_token,
147 } => {
148 assert!(
149 fence_token > current_token,
150 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
151 );
152 if fence_token.deploy_generation > current_token.deploy_generation {
153 Err(FenceError::DeployGeneration {
154 current_generation: current_token.deploy_generation,
155 fence_generation: fence_token.deploy_generation,
156 })
157 } else {
158 assert!(
159 fence_token.epoch > current_token.epoch,
160 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
161 );
162 Err(FenceError::Epoch {
163 current_epoch: current_token.epoch,
164 fence_epoch: fence_token.epoch,
165 })
166 }
167 }
168 }
169 }
170
171 fn token(&self) -> Option<FenceToken> {
173 match self {
174 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
175 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
176 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
177 }
178 }
179
180 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
182 match self {
183 FenceableToken::Initializing {
184 durable_token,
185 current_deploy_generation,
186 ..
187 } => {
188 match durable_token {
189 Some(durable_token) => {
190 *durable_token = max(durable_token.clone(), token.clone());
191 }
192 None => {
193 *durable_token = Some(token.clone());
194 }
195 }
196 if let Some(current_deploy_generation) = current_deploy_generation {
197 if *current_deploy_generation < token.deploy_generation {
198 *self = FenceableToken::Fenced {
199 current_token: FenceToken {
200 deploy_generation: *current_deploy_generation,
201 epoch: token.epoch,
202 },
203 fence_token: token,
204 };
205 self.validate()?;
206 }
207 }
208 }
209 FenceableToken::Unfenced { current_token } => {
210 if *current_token < token {
211 *self = FenceableToken::Fenced {
212 current_token: current_token.clone(),
213 fence_token: token,
214 };
215 self.validate()?;
216 }
217 }
218 FenceableToken::Fenced { .. } => {
219 self.validate()?;
220 }
221 }
222
223 Ok(())
224 }
225
226 fn generate_unfenced_token(
230 &self,
231 mode: Mode,
232 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
233 let (durable_token, current_deploy_generation) = match self {
234 FenceableToken::Initializing {
235 durable_token,
236 current_deploy_generation,
237 } => (durable_token.clone(), current_deploy_generation.clone()),
238 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
239 };
240
241 let mut fence_updates = Vec::with_capacity(2);
242
243 if let Some(durable_token) = &durable_token {
244 fence_updates.push((
245 StateUpdateKind::FenceToken(durable_token.clone()),
246 Diff::MINUS_ONE,
247 ));
248 }
249
250 let current_deploy_generation = current_deploy_generation
251 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
252 .ok_or(DurableCatalogError::Uninitialized)?;
254 let mut current_epoch = durable_token
255 .map(|token| token.epoch)
256 .unwrap_or(MIN_EPOCH)
257 .get();
258 if matches!(mode, Mode::Writable) {
260 current_epoch = current_epoch + 1;
261 }
262 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
263 let current_token = FenceToken {
264 deploy_generation: current_deploy_generation,
265 epoch: current_epoch,
266 };
267
268 fence_updates.push((
269 StateUpdateKind::FenceToken(current_token.clone()),
270 Diff::ONE,
271 ));
272
273 let current_fenceable_token = FenceableToken::Unfenced { current_token };
274
275 Ok(Some((fence_updates, current_fenceable_token)))
276 }
277}
278
279#[derive(Debug, thiserror::Error)]
281pub(crate) enum CompareAndAppendError {
282 #[error(transparent)]
283 Fence(#[from] FenceError),
284 #[error(
287 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
288 )]
289 UpperMismatch {
290 expected_upper: Timestamp,
291 actual_upper: Timestamp,
292 },
293}
294
295impl CompareAndAppendError {
296 pub(crate) fn unwrap_fence_error(self) -> FenceError {
297 match self {
298 CompareAndAppendError::Fence(e) => e,
299 e @ CompareAndAppendError::UpperMismatch { .. } => {
300 panic!("unexpected upper mismatch: {e:?}")
301 }
302 }
303 }
304}
305
306impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
307 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
308 Self::UpperMismatch {
309 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
310 actual_upper: antichain_to_timestamp(upper_mismatch.current),
311 }
312 }
313}
314
315pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
316 fn apply_update(
320 &mut self,
321 update: StateUpdate<T>,
322 current_fence_token: &mut FenceableToken,
323 metrics: &Arc<Metrics>,
324 ) -> Result<Option<StateUpdate<T>>, FenceError>;
325}
326
327#[derive(Debug)]
341pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
342 pub(crate) mode: Mode,
344 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff>,
346 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
348 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
350 persist_client: PersistClient,
352 shard_id: ShardId,
354 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
358 update_applier: U,
360 pub(crate) upper: Timestamp,
362 fenceable_token: FenceableToken,
364 catalog_content_version: semver::Version,
366 bootstrap_complete: bool,
368 metrics: Arc<Metrics>,
370}
371
372impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
373 #[mz_ore::instrument]
375 async fn current_upper(&mut self) -> Timestamp {
376 match self.mode {
377 Mode::Writable | Mode::Readonly => {
378 let upper = self.write_handle.fetch_recent_upper().await;
379 antichain_to_timestamp(upper.clone())
380 }
381 Mode::Savepoint => self.upper,
382 }
383 }
384
385 #[mz_ore::instrument]
389 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
390 &mut self,
391 updates: Vec<(S, Diff)>,
392 commit_ts: Timestamp,
393 ) -> Result<Timestamp, CompareAndAppendError> {
394 assert_eq!(self.mode, Mode::Writable);
395 assert!(
396 commit_ts >= self.upper,
397 "expected commit ts, {}, to be greater than or equal to upper, {}",
398 commit_ts,
399 self.upper
400 );
401
402 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
405 let updates: Vec<_> = updates.clone();
406 let parsed_updates: Vec<_> = updates
407 .clone()
408 .into_iter()
409 .map(|(update, diff)| {
410 let update: StateUpdateKindJson = update.into();
411 (update, diff)
412 })
413 .filter_map(|(update, diff)| {
414 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
415 .ok()
416 .map(|update| (update, diff))
417 })
418 .collect();
419 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
420 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
421 });
422 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
423 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
424 });
425 let contains_fence = contains_retraction && contains_addition;
426 Some((contains_fence, updates))
427 } else {
428 None
429 };
430
431 let updates = updates.into_iter().map(|(kind, diff)| {
432 let kind: StateUpdateKindJson = kind.into();
433 (
434 (Into::<SourceData>::into(kind), ()),
435 commit_ts,
436 diff.into_inner(),
437 )
438 });
439 let next_upper = commit_ts.step_forward();
440 let res = self
441 .write_handle
442 .compare_and_append(
443 updates,
444 Antichain::from_elem(self.upper),
445 Antichain::from_elem(next_upper),
446 )
447 .await
448 .expect("invalid usage");
449
450 if let Err(e @ UpperMismatch { .. }) = res {
455 self.sync_to_current_upper().await?;
456 if let Some((contains_fence, updates)) = contains_fence {
457 assert!(
458 contains_fence,
459 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
460 )
461 }
462 return Err(e.into());
463 }
464
465 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
467
468 let opaque = self.since_handle.opaque().clone();
473 let downgrade = self
474 .since_handle
475 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
476 .await;
477
478 match downgrade {
479 None => {}
480 Some(Err(e)) => soft_panic_or_log!("found opaque value {e:?}, but expected {opaque:?}"),
481 Some(Ok(updated)) => soft_assert_or_log!(
482 updated == downgrade_to,
483 "updated bound should match expected"
484 ),
485 }
486 self.sync(next_upper).await?;
487 Ok(next_upper)
488 }
489
490 #[mz_ore::instrument]
493 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
494 let current_upper = self.current_upper().await;
495
496 let mut snapshot = Vec::new();
497 let mut read_handle = self.read_handle().await;
498 let as_of = as_of(&read_handle, current_upper);
499 let mut stream = Box::pin(
500 read_handle
502 .snapshot_and_stream(Antichain::from_elem(as_of))
503 .await
504 .expect("we have advanced the restart_as_of by the since"),
505 );
506 while let Some(update) = stream.next().await {
507 snapshot.push(update)
508 }
509 read_handle.expire().await;
510 snapshot
511 .into_iter()
512 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
513 .map(|state_update| state_update.try_into().expect("kind decoding error"))
514 .collect()
515 }
516
517 #[mz_ore::instrument]
521 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
522 let upper = self.current_upper().await;
523 self.sync(upper).await
524 }
525
526 #[mz_ore::instrument(level = "debug")]
530 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
531 self.metrics.syncs.inc();
532 let counter = self.metrics.sync_latency_seconds.clone();
533 self.sync_inner(target_upper)
534 .wall_time()
535 .inc_by(counter)
536 .await
537 }
538
539 #[mz_ore::instrument(level = "debug")]
540 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
541 self.fenceable_token.validate()?;
542
543 if self.mode == Mode::Savepoint {
546 self.upper = max(self.upper, target_upper);
547 return Ok(());
548 }
549
550 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
551
552 while self.upper < target_upper {
553 let listen_events = self.listen.fetch_next().await;
554 for listen_event in listen_events {
555 match listen_event {
556 ListenEvent::Progress(upper) => {
557 debug!("synced up to {upper:?}");
558 self.upper = antichain_to_timestamp(upper);
559 while let Some((ts, updates)) = updates.pop_first() {
564 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
565 let updates = updates.into_iter().map(
566 |update: StateUpdate<StateUpdateKindJson>| {
567 let kind =
568 T::try_from(update.kind).expect("kind decoding error");
569 StateUpdate {
570 kind,
571 ts: update.ts,
572 diff: update.diff,
573 }
574 },
575 );
576 self.apply_updates(updates)?;
577 }
578 }
579 ListenEvent::Updates(batch_updates) => {
580 for update in batch_updates {
581 let update: StateUpdate<StateUpdateKindJson> = update.into();
582 updates.entry(update.ts).or_default().push(update);
583 }
584 }
585 }
586 }
587 }
588 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
589 Ok(())
590 }
591
592 #[mz_ore::instrument(level = "debug")]
593 pub(crate) fn apply_updates(
594 &mut self,
595 updates: impl IntoIterator<Item = StateUpdate<T>>,
596 ) -> Result<(), FenceError> {
597 let mut updates: Vec<_> = updates
598 .into_iter()
599 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
600 .collect();
601
602 differential_dataflow::consolidation::consolidate_updates(&mut updates);
606
607 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
610
611 let mut errors = Vec::new();
612
613 for (kind, ts, diff) in updates {
614 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
615 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
616 }
617
618 match self.update_applier.apply_update(
619 StateUpdate { kind, ts, diff },
620 &mut self.fenceable_token,
621 &self.metrics,
622 ) {
623 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
624 Ok(None) => {}
625 Err(err) => errors.push(err),
628 }
629 }
630
631 errors.sort();
632 if let Some(err) = errors.into_iter().next() {
633 return Err(err);
634 }
635
636 self.consolidate();
637
638 Ok(())
639 }
640
641 #[mz_ore::instrument]
642 pub(crate) fn consolidate(&mut self) {
643 soft_assert_no_log!(
644 self.snapshot
645 .windows(2)
646 .all(|updates| updates[0].1 <= updates[1].1),
647 "snapshot should be sorted by timestamp, {:#?}",
648 self.snapshot
649 );
650
651 let new_ts = self
652 .snapshot
653 .last()
654 .map(|(_, ts, _)| *ts)
655 .unwrap_or_else(Timestamp::minimum);
656 for (_, ts, _) in &mut self.snapshot {
657 *ts = new_ts;
658 }
659 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
660 }
661
662 async fn with_trace<R>(
666 &mut self,
667 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
668 ) -> Result<R, CatalogError> {
669 self.sync_to_current_upper().await?;
670 f(&self.snapshot)
671 }
672
673 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
675 self.persist_client
676 .open_leased_reader(
677 self.shard_id,
678 Arc::new(persist_desc()),
679 Arc::new(UnitSchema::default()),
680 Diagnostics {
681 shard_name: CATALOG_SHARD_NAME.to_string(),
682 handle_purpose: "openable durable catalog state temporary reader".to_string(),
683 },
684 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
685 )
686 .await
687 .expect("invalid usage")
688 }
689
690 async fn expire(self: Box<Self>) {
692 self.write_handle.expire().await;
693 self.listen.expire().await;
694 }
695}
696
697impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
698 async fn with_snapshot<T>(
702 &mut self,
703 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
704 ) -> Result<T, CatalogError> {
705 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
706 where
707 K: Ord + Clone,
708 V: Ord + Clone + Debug,
709 {
710 let key = key.clone();
711 let value = value.clone();
712 if diff == Diff::ONE {
713 let prev = map.insert(key, value);
714 assert_eq!(
715 prev, None,
716 "values must be explicitly retracted before inserting a new value"
717 );
718 } else if diff == Diff::MINUS_ONE {
719 let prev = map.remove(&key);
720 assert_eq!(
721 prev,
722 Some(value),
723 "retraction does not match existing value"
724 );
725 }
726 }
727
728 self.with_trace(|trace| {
729 let mut snapshot = Snapshot::empty();
730 for (kind, ts, diff) in trace {
731 let diff = *diff;
732 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
733 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
734 }
735
736 match kind {
737 StateUpdateKind::AuditLog(_key, ()) => {
738 }
740 StateUpdateKind::Cluster(key, value) => {
741 apply(&mut snapshot.clusters, key, value, diff);
742 }
743 StateUpdateKind::ClusterReplica(key, value) => {
744 apply(&mut snapshot.cluster_replicas, key, value, diff);
745 }
746 StateUpdateKind::Comment(key, value) => {
747 apply(&mut snapshot.comments, key, value, diff);
748 }
749 StateUpdateKind::Config(key, value) => {
750 apply(&mut snapshot.configs, key, value, diff);
751 }
752 StateUpdateKind::Database(key, value) => {
753 apply(&mut snapshot.databases, key, value, diff);
754 }
755 StateUpdateKind::DefaultPrivilege(key, value) => {
756 apply(&mut snapshot.default_privileges, key, value, diff);
757 }
758 StateUpdateKind::FenceToken(_token) => {
759 }
761 StateUpdateKind::IdAllocator(key, value) => {
762 apply(&mut snapshot.id_allocator, key, value, diff);
763 }
764 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
765 apply(&mut snapshot.introspection_sources, key, value, diff);
766 }
767 StateUpdateKind::Item(key, value) => {
768 apply(&mut snapshot.items, key, value, diff);
769 }
770 StateUpdateKind::NetworkPolicy(key, value) => {
771 apply(&mut snapshot.network_policies, key, value, diff);
772 }
773 StateUpdateKind::Role(key, value) => {
774 apply(&mut snapshot.roles, key, value, diff);
775 }
776 StateUpdateKind::Schema(key, value) => {
777 apply(&mut snapshot.schemas, key, value, diff);
778 }
779 StateUpdateKind::Setting(key, value) => {
780 apply(&mut snapshot.settings, key, value, diff);
781 }
782 StateUpdateKind::SourceReferences(key, value) => {
783 apply(&mut snapshot.source_references, key, value, diff);
784 }
785 StateUpdateKind::SystemConfiguration(key, value) => {
786 apply(&mut snapshot.system_configurations, key, value, diff);
787 }
788 StateUpdateKind::SystemObjectMapping(key, value) => {
789 apply(&mut snapshot.system_object_mappings, key, value, diff);
790 }
791 StateUpdateKind::SystemPrivilege(key, value) => {
792 apply(&mut snapshot.system_privileges, key, value, diff);
793 }
794 StateUpdateKind::StorageCollectionMetadata(key, value) => {
795 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
796 }
797 StateUpdateKind::UnfinalizedShard(key, ()) => {
798 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
799 }
800 StateUpdateKind::TxnWalShard((), value) => {
801 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
802 }
803 StateUpdateKind::RoleAuth(key, value) => {
804 apply(&mut snapshot.role_auth, key, value, diff);
805 }
806 }
807 }
808 f(snapshot)
809 })
810 .await
811 }
812
813 #[mz_ore::instrument(level = "debug")]
820 async fn persist_snapshot(&self) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
821 let mut read_handle = self.read_handle().await;
822 let as_of = as_of(&read_handle, self.upper);
823 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
824 .await
825 .map(|update| update.try_into().expect("kind decoding error"));
826 read_handle.expire().await;
827 snapshot
828 }
829}
830
831#[derive(Debug)]
833pub(crate) struct UnopenedCatalogStateInner {
834 configs: BTreeMap<String, u64>,
836 settings: BTreeMap<String, String>,
838}
839
840impl UnopenedCatalogStateInner {
841 fn new() -> UnopenedCatalogStateInner {
842 UnopenedCatalogStateInner {
843 configs: BTreeMap::new(),
844 settings: BTreeMap::new(),
845 }
846 }
847}
848
849impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
850 fn apply_update(
851 &mut self,
852 update: StateUpdate<StateUpdateKindJson>,
853 current_fence_token: &mut FenceableToken,
854 _metrics: &Arc<Metrics>,
855 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
856 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
857 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
858 match (kind, update.diff) {
859 (StateUpdateKind::Config(key, value), Diff::ONE) => {
860 let prev = self.configs.insert(key.key, value.value);
861 assert_eq!(
862 prev, None,
863 "values must be explicitly retracted before inserting a new value"
864 );
865 }
866 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
867 let prev = self.configs.remove(&key.key);
868 assert_eq!(
869 prev,
870 Some(value.value),
871 "retraction does not match existing value"
872 );
873 }
874 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
875 let prev = self.settings.insert(key.name, value.value);
876 assert_eq!(
877 prev, None,
878 "values must be explicitly retracted before inserting a new value"
879 );
880 }
881 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
882 let prev = self.settings.remove(&key.name);
883 assert_eq!(
884 prev,
885 Some(value.value),
886 "retraction does not match existing value"
887 );
888 }
889 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
890 current_fence_token.maybe_fence(fence_token)?;
891 }
892 _ => {}
893 }
894 }
895
896 Ok(Some(update))
897 }
898}
899
900pub(crate) type UnopenedPersistCatalogState =
908 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
909
910impl UnopenedPersistCatalogState {
911 #[mz_ore::instrument]
917 pub(crate) async fn new(
918 persist_client: PersistClient,
919 organization_id: Uuid,
920 version: semver::Version,
921 deploy_generation: Option<u64>,
922 metrics: Arc<Metrics>,
923 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
924 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
925 debug!(?catalog_shard_id, "new persist backed catalog state");
926
927 let version_in_catalog_shard =
931 fetch_catalog_shard_version(&persist_client, catalog_shard_id).await;
932 if let Some(version_in_catalog_shard) = version_in_catalog_shard {
933 if !mz_persist_client::cfg::code_can_write_data(&version, &version_in_catalog_shard) {
934 return Err(DurableCatalogError::IncompatiblePersistVersion {
935 found_version: version_in_catalog_shard,
936 catalog_version: version,
937 });
938 }
939 }
940
941 let open_handles_start = Instant::now();
942 info!("startup: envd serve: catalog init: open handles beginning");
943 let since_handle = persist_client
944 .open_critical_since(
945 catalog_shard_id,
946 CATALOG_CRITICAL_SINCE.clone(),
947 Opaque::encode(&i64::MIN),
948 Diagnostics {
949 shard_name: CATALOG_SHARD_NAME.to_string(),
950 handle_purpose: "durable catalog state critical since".to_string(),
951 },
952 )
953 .await
954 .expect("invalid usage");
955
956 let (mut write_handle, mut read_handle) = persist_client
957 .open(
958 catalog_shard_id,
959 Arc::new(persist_desc()),
960 Arc::new(UnitSchema::default()),
961 Diagnostics {
962 shard_name: CATALOG_SHARD_NAME.to_string(),
963 handle_purpose: "durable catalog state handles".to_string(),
964 },
965 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
966 )
967 .await
968 .expect("invalid usage");
969 info!(
970 "startup: envd serve: catalog init: open handles complete in {:?}",
971 open_handles_start.elapsed()
972 );
973
974 let upper = {
976 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
977 let upper = Antichain::from_elem(Timestamp::minimum());
978 let next_upper = Timestamp::minimum().step_forward();
979 match write_handle
980 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
981 .await
982 .expect("invalid usage")
983 {
984 Ok(()) => next_upper,
985 Err(mismatch) => antichain_to_timestamp(mismatch.current),
986 }
987 };
988
989 let snapshot_start = Instant::now();
990 info!("startup: envd serve: catalog init: snapshot beginning");
991 let as_of = as_of(&read_handle, upper);
992 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
993 .await
994 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
995 .collect();
996 let listen = read_handle
997 .listen(Antichain::from_elem(as_of))
998 .await
999 .expect("invalid usage");
1000 info!(
1001 "startup: envd serve: catalog init: snapshot complete in {:?}",
1002 snapshot_start.elapsed()
1003 );
1004
1005 let mut handle = UnopenedPersistCatalogState {
1006 mode: Mode::Writable,
1008 since_handle,
1009 write_handle,
1010 listen,
1011 persist_client,
1012 shard_id: catalog_shard_id,
1013 snapshot: Vec::new(),
1015 update_applier: UnopenedCatalogStateInner::new(),
1016 upper,
1017 fenceable_token: FenceableToken::new(deploy_generation),
1018 catalog_content_version: version,
1019 bootstrap_complete: false,
1020 metrics,
1021 };
1022 soft_assert_no_log!(
1025 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1026 "snapshot should be consolidated: {snapshot:#?}"
1027 );
1028
1029 let apply_start = Instant::now();
1030 info!("startup: envd serve: catalog init: apply updates beginning");
1031 let updates = snapshot
1032 .into_iter()
1033 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1034 handle.apply_updates(updates)?;
1035 info!(
1036 "startup: envd serve: catalog init: apply updates complete in {:?}",
1037 apply_start.elapsed()
1038 );
1039
1040 if let Some(found_version) = handle.get_catalog_content_version().await? {
1046 if handle
1048 .catalog_content_version
1049 .cmp_precedence(&found_version)
1050 == std::cmp::Ordering::Less
1051 {
1052 return Err(DurableCatalogError::IncompatiblePersistVersion {
1053 found_version,
1054 catalog_version: handle.catalog_content_version,
1055 });
1056 }
1057 }
1058
1059 Ok(handle)
1060 }
1061
1062 #[mz_ore::instrument]
1063 async fn open_inner(
1064 mut self,
1065 mode: Mode,
1066 initial_ts: Timestamp,
1067 bootstrap_args: &BootstrapArgs,
1068 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1069 let mut commit_ts = self.upper;
1072 self.mode = mode;
1073
1074 match (&self.mode, &self.fenceable_token) {
1076 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1077 return Err(DurableCatalogError::Internal(
1078 "catalog should not have fenced before opening".to_string(),
1079 )
1080 .into());
1081 }
1082 (
1083 Mode::Writable | Mode::Savepoint,
1084 FenceableToken::Initializing {
1085 current_deploy_generation: None,
1086 ..
1087 },
1088 ) => {
1089 return Err(DurableCatalogError::Internal(format!(
1090 "cannot open in mode '{:?}' without a deploy generation",
1091 self.mode,
1092 ))
1093 .into());
1094 }
1095 _ => {}
1096 }
1097
1098 let read_only = matches!(self.mode, Mode::Readonly);
1099
1100 loop {
1102 self.sync_to_current_upper().await?;
1103 commit_ts = max(commit_ts, self.upper);
1104 let (fence_updates, current_fenceable_token) = self
1105 .fenceable_token
1106 .generate_unfenced_token(self.mode)?
1107 .ok_or_else(|| {
1108 DurableCatalogError::Internal(
1109 "catalog should not have fenced before opening".to_string(),
1110 )
1111 })?;
1112 debug!(
1113 ?self.upper,
1114 ?self.fenceable_token,
1115 ?current_fenceable_token,
1116 "fencing previous catalogs"
1117 );
1118 if matches!(self.mode, Mode::Writable) {
1119 match self
1120 .compare_and_append(fence_updates.clone(), commit_ts)
1121 .await
1122 {
1123 Ok(upper) => {
1124 commit_ts = upper;
1125 }
1126 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1127 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1128 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1129 continue;
1130 }
1131 }
1132 }
1133 self.fenceable_token = current_fenceable_token;
1134 break;
1135 }
1136
1137 if matches!(self.mode, Mode::Writable) {
1138 let mut controller_handle = self
1145 .persist_client
1146 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
1147 self.shard_id,
1148 PersistClient::CONTROLLER_CRITICAL_SINCE,
1149 Opaque::encode(&i64::MIN),
1150 Diagnostics {
1151 shard_name: CATALOG_SHARD_NAME.to_string(),
1152 handle_purpose: "durable catalog state critical since (migration)"
1153 .to_string(),
1154 },
1155 )
1156 .await
1157 .expect("invalid usage");
1158
1159 let since = controller_handle.since().clone();
1160 let res = controller_handle
1161 .compare_and_downgrade_since(
1162 &Opaque::encode(&i64::MIN),
1163 (&Opaque::encode(&PersistEpoch::default()), &since),
1164 )
1165 .await;
1166 match res {
1167 Ok(_) => info!("migrated Opaque of catalog since handle"),
1168 Err(_) => { }
1169 }
1170 }
1171
1172 let is_initialized = self.is_initialized_inner();
1173 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1174 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1175 format!(
1176 "catalog tables do not exist; will not create in {:?} mode",
1177 self.mode
1178 ),
1179 )));
1180 }
1181 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1182
1183 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1185 .snapshot
1186 .into_iter()
1187 .partition(|(update, _, _)| update.is_audit_log());
1188 self.snapshot = snapshot;
1189 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1190 let audit_log_handle = AuditLogIterator::new(audit_logs);
1191
1192 if is_initialized && !read_only {
1194 commit_ts = upgrade(&mut self, commit_ts).await?;
1195 }
1196
1197 debug!(
1198 ?is_initialized,
1199 ?self.upper,
1200 "initializing catalog state"
1201 );
1202 let mut catalog = PersistCatalogState {
1203 mode: self.mode,
1204 since_handle: self.since_handle,
1205 write_handle: self.write_handle,
1206 listen: self.listen,
1207 persist_client: self.persist_client,
1208 shard_id: self.shard_id,
1209 upper: self.upper,
1210 fenceable_token: self.fenceable_token,
1211 snapshot: Vec::new(),
1213 update_applier: CatalogStateInner::new(),
1214 catalog_content_version: self.catalog_content_version,
1215 bootstrap_complete: false,
1216 metrics: self.metrics,
1217 };
1218 catalog.metrics.collection_entries.reset();
1219 catalog
1222 .metrics
1223 .collection_entries
1224 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1225 .add(audit_log_count.into_inner());
1226 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1227 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1228 StateUpdate { kind, ts, diff }
1229 });
1230 catalog.apply_updates(updates)?;
1231
1232 let catalog_content_version = catalog.catalog_content_version.to_string();
1233 let txn = if is_initialized {
1234 let mut txn = catalog.transaction().await?;
1235
1236 if txn.get_setting("migration_version".into()).is_none() && mode != Mode::Readonly {
1243 let old_version = txn.get_catalog_content_version();
1244 txn.set_setting("migration_version".into(), old_version.map(Into::into))?;
1245 }
1246
1247 txn.set_catalog_content_version(catalog_content_version)?;
1248 txn
1249 } else {
1250 soft_assert_eq_no_log!(
1251 catalog
1252 .snapshot
1253 .iter()
1254 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1255 .count(),
1256 0,
1257 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1258 catalog.snapshot
1259 );
1260
1261 let mut txn = catalog.transaction().await?;
1262 initialize::initialize(
1263 &mut txn,
1264 bootstrap_args,
1265 initial_ts.into(),
1266 catalog_content_version,
1267 )
1268 .await?;
1269 txn
1270 };
1271
1272 if read_only {
1273 let (txn_batch, _) = txn.into_parts();
1274 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1276 catalog.apply_updates(updates)?;
1277 } else {
1278 txn.commit_internal(commit_ts).await?;
1279 }
1280
1281 if matches!(catalog.mode, Mode::Writable) {
1282 let write_handle = catalog
1283 .persist_client
1284 .open_writer::<SourceData, (), Timestamp, i64>(
1285 catalog.write_handle.shard_id(),
1286 Arc::new(persist_desc()),
1287 Arc::new(UnitSchema::default()),
1288 Diagnostics {
1289 shard_name: CATALOG_SHARD_NAME.to_string(),
1290 handle_purpose: "compact catalog".to_string(),
1291 },
1292 )
1293 .await
1294 .expect("invalid usage");
1295 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1296 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1297 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1300 let () =
1301 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1302 &write_handle,
1303 || fuel.get(),
1304 || wait.get(),
1305 )
1306 .await;
1307 });
1308 }
1309
1310 Ok((Box::new(catalog), audit_log_handle))
1311 }
1312
1313 #[mz_ore::instrument]
1318 fn is_initialized_inner(&self) -> bool {
1319 !self.update_applier.configs.is_empty()
1320 }
1321
1322 #[mz_ore::instrument]
1326 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1327 self.sync_to_current_upper().await?;
1328 Ok(self.update_applier.configs.get(key).cloned())
1329 }
1330
1331 #[mz_ore::instrument]
1335 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1336 self.get_current_config(USER_VERSION_KEY).await
1337 }
1338
1339 #[mz_ore::instrument]
1343 async fn get_current_setting(
1344 &mut self,
1345 name: &str,
1346 ) -> Result<Option<String>, DurableCatalogError> {
1347 self.sync_to_current_upper().await?;
1348 Ok(self.update_applier.settings.get(name).cloned())
1349 }
1350
1351 #[mz_ore::instrument]
1356 async fn get_catalog_content_version(
1357 &mut self,
1358 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1359 let version = self
1360 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1361 .await?;
1362 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1363 Ok(version)
1364 }
1365}
1366
1367#[async_trait]
1368impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1369 #[mz_ore::instrument]
1370 async fn open_savepoint(
1371 mut self: Box<Self>,
1372 initial_ts: Timestamp,
1373 bootstrap_args: &BootstrapArgs,
1374 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1375 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1376 .boxed()
1377 .await
1378 }
1379
1380 #[mz_ore::instrument]
1381 async fn open_read_only(
1382 mut self: Box<Self>,
1383 bootstrap_args: &BootstrapArgs,
1384 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1385 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1386 .boxed()
1387 .await
1388 .map(|(catalog, _)| catalog)
1389 }
1390
1391 #[mz_ore::instrument]
1392 async fn open(
1393 mut self: Box<Self>,
1394 initial_ts: Timestamp,
1395 bootstrap_args: &BootstrapArgs,
1396 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1397 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1398 .boxed()
1399 .await
1400 }
1401
1402 #[mz_ore::instrument(level = "debug")]
1403 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1404 Ok(DebugCatalogState(*self))
1405 }
1406
1407 #[mz_ore::instrument]
1408 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1409 self.sync_to_current_upper().await?;
1410 Ok(self.is_initialized_inner())
1411 }
1412
1413 #[mz_ore::instrument]
1414 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1415 self.sync_to_current_upper().await?;
1416 self.fenceable_token
1417 .validate()?
1418 .map(|token| token.epoch)
1419 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1420 }
1421
1422 #[mz_ore::instrument]
1423 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1424 self.sync_to_current_upper().await?;
1425 self.fenceable_token
1426 .token()
1427 .map(|token| token.deploy_generation)
1428 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1429 }
1430
1431 #[mz_ore::instrument(level = "debug")]
1432 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1433 let value = self
1434 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1435 .await?;
1436 match value {
1437 None => Ok(None),
1438 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1439 }
1440 }
1441
1442 #[mz_ore::instrument(level = "debug")]
1443 async fn get_0dt_deployment_ddl_check_interval(
1444 &mut self,
1445 ) -> Result<Option<Duration>, CatalogError> {
1446 let value = self
1447 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1448 .await?;
1449 match value {
1450 None => Ok(None),
1451 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1452 }
1453 }
1454
1455 #[mz_ore::instrument(level = "debug")]
1456 async fn get_enable_0dt_deployment_panic_after_timeout(
1457 &mut self,
1458 ) -> Result<Option<bool>, CatalogError> {
1459 let value = self
1460 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1461 .await?;
1462 match value {
1463 None => Ok(None),
1464 Some(0) => Ok(Some(false)),
1465 Some(1) => Ok(Some(true)),
1466 Some(v) => Err(
1467 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1468 "{v} is not a valid boolean value"
1469 )))
1470 .into(),
1471 ),
1472 }
1473 }
1474
1475 #[mz_ore::instrument]
1476 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1477 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1478 .await
1479 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1480 }
1481
1482 #[mz_ore::instrument]
1483 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1484 self.sync_to_current_upper().await?;
1485 if self.is_initialized_inner() {
1486 let snapshot = self.snapshot_unconsolidated().await;
1487 Ok(Trace::from_snapshot(snapshot))
1488 } else {
1489 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1490 }
1491 }
1492
1493 #[mz_ore::instrument]
1494 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1495 self.sync_to_current_upper().await?;
1496 if self.is_initialized_inner() {
1497 let snapshot = self.current_snapshot().await?;
1498 Ok(Trace::from_snapshot(snapshot))
1499 } else {
1500 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1501 }
1502 }
1503
1504 #[mz_ore::instrument(level = "debug")]
1505 async fn expire(self: Box<Self>) {
1506 self.expire().await
1507 }
1508}
1509
1510#[derive(Debug)]
1512struct CatalogStateInner {
1513 updates: VecDeque<memory::objects::StateUpdate>,
1515}
1516
1517impl CatalogStateInner {
1518 fn new() -> CatalogStateInner {
1519 CatalogStateInner {
1520 updates: VecDeque::new(),
1521 }
1522 }
1523}
1524
1525impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1526 fn apply_update(
1527 &mut self,
1528 update: StateUpdate<StateUpdateKind>,
1529 current_fence_token: &mut FenceableToken,
1530 metrics: &Arc<Metrics>,
1531 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1532 if let Some(collection_type) = update.kind.collection_type() {
1533 metrics
1534 .collection_entries
1535 .with_label_values(&[&collection_type.to_string()])
1536 .add(update.diff.into_inner());
1537 }
1538
1539 {
1540 let update: Option<memory::objects::StateUpdate> = (&update)
1541 .try_into()
1542 .expect("invalid persisted update: {update:#?}");
1543 if let Some(update) = update {
1544 self.updates.push_back(update);
1545 }
1546 }
1547
1548 match (update.kind, update.diff) {
1549 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1550 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1552 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1553 current_fence_token.maybe_fence(token)?;
1554 Ok(None)
1555 }
1556 (kind, diff) => Ok(Some(StateUpdate {
1557 kind,
1558 ts: update.ts,
1559 diff,
1560 })),
1561 }
1562 }
1563}
1564
1565type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1571
1572#[async_trait]
1573impl ReadOnlyDurableCatalogState for PersistCatalogState {
1574 fn epoch(&self) -> Epoch {
1575 self.fenceable_token
1576 .token()
1577 .expect("opened catalog state must have an epoch")
1578 .epoch
1579 }
1580
1581 fn metrics(&self) -> &Metrics {
1582 &self.metrics
1583 }
1584
1585 #[mz_ore::instrument(level = "debug")]
1586 async fn expire(self: Box<Self>) {
1587 self.expire().await
1588 }
1589
1590 fn is_bootstrap_complete(&self) -> bool {
1591 self.bootstrap_complete
1592 }
1593
1594 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1595 self.sync_to_current_upper().await?;
1596 let audit_logs: Vec<_> = self
1597 .persist_snapshot()
1598 .await
1599 .filter_map(
1600 |StateUpdate {
1601 kind,
1602 ts: _,
1603 diff: _,
1604 }| match kind {
1605 StateUpdateKind::AuditLog(key, ()) => Some(key),
1606 _ => None,
1607 },
1608 )
1609 .collect();
1610 let mut audit_logs: Vec<_> = audit_logs
1611 .into_iter()
1612 .map(RustType::from_proto)
1613 .map_ok(|key: AuditLogKey| key.event)
1614 .collect::<Result<_, _>>()?;
1615 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1616 Ok(audit_logs)
1617 }
1618
1619 #[mz_ore::instrument(level = "debug")]
1620 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1621 self.with_trace(|trace| {
1622 Ok(trace
1623 .into_iter()
1624 .rev()
1625 .filter_map(|(kind, _, _)| match kind {
1626 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1627 Some(value.next_id)
1628 }
1629 _ => None,
1630 })
1631 .next()
1632 .expect("must exist"))
1633 })
1634 .await
1635 }
1636
1637 #[mz_ore::instrument(level = "debug")]
1638 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1639 self.sync_to_current_upper().await?;
1640 Ok(self
1641 .fenceable_token
1642 .token()
1643 .expect("opened catalogs must have a token")
1644 .deploy_generation)
1645 }
1646
1647 #[mz_ore::instrument(level = "debug")]
1648 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1649 self.with_snapshot(Ok).await
1650 }
1651
1652 #[mz_ore::instrument(level = "debug")]
1653 async fn sync_to_current_updates(
1654 &mut self,
1655 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1656 let upper = self.current_upper().await;
1657 self.sync_updates(upper).await
1658 }
1659
1660 #[mz_ore::instrument(level = "debug")]
1661 async fn sync_updates(
1662 &mut self,
1663 target_upper: mz_repr::Timestamp,
1664 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1665 self.sync(target_upper).await?;
1666 let mut updates = Vec::new();
1667 while let Some(update) = self.update_applier.updates.front() {
1668 if update.ts >= target_upper {
1669 break;
1670 }
1671
1672 let update = self
1673 .update_applier
1674 .updates
1675 .pop_front()
1676 .expect("peeked above");
1677 updates.push(update);
1678 }
1679 Ok(updates)
1680 }
1681
1682 async fn current_upper(&mut self) -> Timestamp {
1683 self.current_upper().await
1684 }
1685}
1686
1687#[async_trait]
1688#[allow(mismatched_lifetime_syntaxes)]
1689impl DurableCatalogState for PersistCatalogState {
1690 fn is_read_only(&self) -> bool {
1691 matches!(self.mode, Mode::Readonly)
1692 }
1693
1694 fn is_savepoint(&self) -> bool {
1695 matches!(self.mode, Mode::Savepoint)
1696 }
1697
1698 async fn mark_bootstrap_complete(&mut self) {
1699 self.bootstrap_complete = true;
1700 if matches!(self.mode, Mode::Writable) {
1701 self.since_handle
1702 .upgrade_version()
1703 .await
1704 .expect("invalid usage")
1705 }
1706 }
1707
1708 #[mz_ore::instrument(level = "debug")]
1709 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1710 self.metrics.transactions_started.inc();
1711 let snapshot = self.snapshot().await?;
1712 let commit_ts = self.upper.clone();
1713 Transaction::new(self, snapshot, commit_ts)
1714 }
1715
1716 #[mz_ore::instrument(level = "debug")]
1717 async fn commit_transaction(
1718 &mut self,
1719 txn_batch: TransactionBatch,
1720 commit_ts: Timestamp,
1721 ) -> Result<Timestamp, CatalogError> {
1722 async fn commit_transaction_inner(
1723 catalog: &mut PersistCatalogState,
1724 txn_batch: TransactionBatch,
1725 commit_ts: Timestamp,
1726 ) -> Result<Timestamp, CatalogError> {
1727 assert_eq!(
1732 catalog.upper, txn_batch.upper,
1733 "only one transaction at a time is supported"
1734 );
1735
1736 assert!(
1737 commit_ts >= catalog.upper,
1738 "expected commit ts, {}, to be greater than or equal to upper, {}",
1739 commit_ts,
1740 catalog.upper
1741 );
1742
1743 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1744 debug!("committing updates: {updates:?}");
1745
1746 let next_upper = match catalog.mode {
1747 Mode::Writable => catalog
1748 .compare_and_append(updates, commit_ts)
1749 .await
1750 .map_err(|e| e.unwrap_fence_error())?,
1751 Mode::Savepoint => {
1752 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1753 kind,
1754 ts: commit_ts,
1755 diff,
1756 });
1757 catalog.apply_updates(updates)?;
1758 catalog.upper = commit_ts.step_forward();
1759 catalog.upper
1760 }
1761 Mode::Readonly => {
1762 if !updates.is_empty() {
1766 return Err(DurableCatalogError::NotWritable(format!(
1767 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1768 ))
1769 .into());
1770 }
1771 catalog.upper
1772 }
1773 };
1774
1775 Ok(next_upper)
1776 }
1777 self.metrics.transaction_commits.inc();
1778 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1779 commit_transaction_inner(self, txn_batch, commit_ts)
1780 .wall_time()
1781 .inc_by(counter)
1782 .await
1783 }
1784
1785 #[mz_ore::instrument(level = "debug")]
1786 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1787 if self.is_read_only() {
1789 return Ok(());
1790 }
1791 self.sync_to_current_upper().await?;
1792 Ok(())
1793 }
1794
1795 fn shard_id(&self) -> ShardId {
1796 self.shard_id
1797 }
1798}
1799
1800pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1802 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1803 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1804 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1805 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1806}
1807
1808fn as_of(
1811 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1812 upper: Timestamp,
1813) -> Timestamp {
1814 let since = read_handle.since().clone();
1815 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1816 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1817 });
1818 soft_assert_or_log!(
1821 since.less_equal(&as_of),
1822 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1823 );
1824 as_of.advance_by(since.borrow());
1827 as_of
1828}
1829
1830async fn fetch_catalog_shard_version(
1833 persist_client: &PersistClient,
1834 catalog_shard_id: ShardId,
1835) -> Option<semver::Version> {
1836 let shard_state = persist_client
1837 .inspect_shard::<Timestamp>(&catalog_shard_id)
1838 .await
1839 .ok()?;
1840 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1841 let json_version = json_state
1842 .get("applier_version")
1843 .cloned()
1844 .expect("missing applier_version");
1845 let version = serde_json::from_value(json_version).expect("version deserialization error");
1846 Some(version)
1847}
1848
1849#[mz_ore::instrument(level = "debug")]
1854async fn snapshot_binary(
1855 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1856 as_of: Timestamp,
1857 metrics: &Arc<Metrics>,
1858) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1859 metrics.snapshots_taken.inc();
1860 let counter = metrics.snapshot_latency_seconds.clone();
1861 snapshot_binary_inner(read_handle, as_of)
1862 .wall_time()
1863 .inc_by(counter)
1864 .await
1865}
1866
1867#[mz_ore::instrument(level = "debug")]
1872async fn snapshot_binary_inner(
1873 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1874 as_of: Timestamp,
1875) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1876 let snapshot = read_handle
1877 .snapshot_and_fetch(Antichain::from_elem(as_of))
1878 .await
1879 .expect("we have advanced the restart_as_of by the since");
1880 soft_assert_no_log!(
1881 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1882 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1883 );
1884 snapshot
1885 .into_iter()
1886 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1887 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1888}
1889
1890pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1895 antichain
1896 .into_option()
1897 .expect("we use a totally ordered time and never finalize the shard")
1898}
1899
1900impl Trace {
1903 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1905 let mut trace = Trace::new();
1906 for StateUpdate { kind, ts, diff } in snapshot {
1907 match kind {
1908 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1909 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1910 StateUpdateKind::ClusterReplica(k, v) => {
1911 trace.cluster_replicas.values.push(((k, v), ts, diff))
1912 }
1913 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1914 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1915 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1916 StateUpdateKind::DefaultPrivilege(k, v) => {
1917 trace.default_privileges.values.push(((k, v), ts, diff))
1918 }
1919 StateUpdateKind::FenceToken(_) => {
1920 }
1922 StateUpdateKind::IdAllocator(k, v) => {
1923 trace.id_allocator.values.push(((k, v), ts, diff))
1924 }
1925 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1926 trace.introspection_sources.values.push(((k, v), ts, diff))
1927 }
1928 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1929 StateUpdateKind::NetworkPolicy(k, v) => {
1930 trace.network_policies.values.push(((k, v), ts, diff))
1931 }
1932 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1933 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1934 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1935 StateUpdateKind::SourceReferences(k, v) => {
1936 trace.source_references.values.push(((k, v), ts, diff))
1937 }
1938 StateUpdateKind::SystemConfiguration(k, v) => {
1939 trace.system_configurations.values.push(((k, v), ts, diff))
1940 }
1941 StateUpdateKind::SystemObjectMapping(k, v) => {
1942 trace.system_object_mappings.values.push(((k, v), ts, diff))
1943 }
1944 StateUpdateKind::SystemPrivilege(k, v) => {
1945 trace.system_privileges.values.push(((k, v), ts, diff))
1946 }
1947 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1948 .storage_collection_metadata
1949 .values
1950 .push(((k, v), ts, diff)),
1951 StateUpdateKind::UnfinalizedShard(k, ()) => {
1952 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1953 }
1954 StateUpdateKind::TxnWalShard((), v) => {
1955 trace.txn_wal_shard.values.push((((), v), ts, diff))
1956 }
1957 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1958 }
1959 }
1960 trace
1961 }
1962}
1963
1964impl UnopenedPersistCatalogState {
1965 #[mz_ore::instrument]
1967 pub(crate) async fn debug_edit<T: Collection>(
1968 &mut self,
1969 key: T::Key,
1970 value: T::Value,
1971 ) -> Result<Option<T::Value>, CatalogError>
1972 where
1973 T::Key: PartialEq + Eq + Debug + Clone,
1974 T::Value: Debug + Clone,
1975 {
1976 let prev_value = loop {
1977 let key = key.clone();
1978 let value = value.clone();
1979 let snapshot = self.current_snapshot().await?;
1980 let trace = Trace::from_snapshot(snapshot);
1981 let collection_trace = T::collection_trace(trace);
1982 let prev_values: Vec<_> = collection_trace
1983 .values
1984 .into_iter()
1985 .filter(|((k, _), _, diff)| {
1986 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
1987 &key == k
1988 })
1989 .collect();
1990
1991 let prev_value = match &prev_values[..] {
1992 [] => None,
1993 [((_, v), _, _)] => Some(v.clone()),
1994 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
1995 };
1996
1997 let mut updates: Vec<_> = prev_values
1998 .into_iter()
1999 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2000 .collect();
2001 updates.push((T::update(key, value), Diff::ONE));
2002 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2004 Some((fence_updates, current_fenceable_token)) => {
2005 updates.extend(fence_updates.clone());
2006 match self.compare_and_append(updates, self.upper).await {
2007 Ok(_) => {
2008 self.fenceable_token = current_fenceable_token;
2009 break prev_value;
2010 }
2011 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2012 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2013 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2014 continue;
2015 }
2016 }
2017 }
2018 None => {
2019 self.compare_and_append(updates, self.upper)
2020 .await
2021 .map_err(|e| e.unwrap_fence_error())?;
2022 break prev_value;
2023 }
2024 }
2025 };
2026 Ok(prev_value)
2027 }
2028
2029 #[mz_ore::instrument]
2031 pub(crate) async fn debug_delete<T: Collection>(
2032 &mut self,
2033 key: T::Key,
2034 ) -> Result<(), CatalogError>
2035 where
2036 T::Key: PartialEq + Eq + Debug + Clone,
2037 T::Value: Debug,
2038 {
2039 loop {
2040 let key = key.clone();
2041 let snapshot = self.current_snapshot().await?;
2042 let trace = Trace::from_snapshot(snapshot);
2043 let collection_trace = T::collection_trace(trace);
2044 let mut retractions: Vec<_> = collection_trace
2045 .values
2046 .into_iter()
2047 .filter(|((k, _), _, diff)| {
2048 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2049 &key == k
2050 })
2051 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2052 .collect();
2053
2054 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2056 Some((fence_updates, current_fenceable_token)) => {
2057 retractions.extend(fence_updates.clone());
2058 match self.compare_and_append(retractions, self.upper).await {
2059 Ok(_) => {
2060 self.fenceable_token = current_fenceable_token;
2061 break;
2062 }
2063 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2064 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2065 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2066 continue;
2067 }
2068 }
2069 }
2070 None => {
2071 self.compare_and_append(retractions, self.upper)
2072 .await
2073 .map_err(|e| e.unwrap_fence_error())?;
2074 break;
2075 }
2076 }
2077 }
2078 Ok(())
2079 }
2080
2081 async fn current_snapshot(
2086 &mut self,
2087 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2088 self.sync_to_current_upper().await?;
2089 self.consolidate();
2090 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2091 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2092 StateUpdate { kind, ts, diff }
2093 }))
2094 }
2095}