1#[cfg(test)]
11mod tests;
12
13use std::cmp::max;
14use std::collections::{BTreeMap, VecDeque};
15use std::fmt::Debug;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use differential_dataflow::lattice::Lattice;
22use futures::{FutureExt, StreamExt};
23use itertools::Itertools;
24use mz_audit_log::VersionedEvent;
25use mz_ore::metrics::MetricsFutureExt;
26use mz_ore::now::EpochMillis;
27use mz_ore::{
28 soft_assert_eq_no_log, soft_assert_eq_or_log, soft_assert_ne_or_log, soft_assert_no_log,
29 soft_assert_or_log, soft_panic_or_log,
30};
31use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
32use mz_persist_client::cli::admin::{CATALOG_FORCE_COMPACTION_FUEL, CATALOG_FORCE_COMPACTION_WAIT};
33use mz_persist_client::critical::SinceHandle;
34use mz_persist_client::error::UpperMismatch;
35use mz_persist_client::read::{Listen, ListenEvent, ReadHandle};
36use mz_persist_client::write::WriteHandle;
37use mz_persist_client::{Diagnostics, PersistClient, ShardId};
38use mz_persist_types::codec_impls::UnitSchema;
39use mz_proto::{RustType, TryFromProtoError};
40use mz_repr::{Diff, RelationDesc, SqlScalarType};
41use mz_storage_types::StorageDiff;
42use mz_storage_types::sources::SourceData;
43use sha2::Digest;
44use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
45use tracing::{debug, info, warn};
46use uuid::Uuid;
47
48use crate::durable::debug::{Collection, CollectionType, DebugCatalogState, Trace};
49use crate::durable::error::FenceError;
50use crate::durable::initialize::{
51 ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT, SYSTEM_CONFIG_SYNCED_KEY, USER_VERSION_KEY,
52 WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL, WITH_0DT_DEPLOYMENT_MAX_WAIT,
53};
54use crate::durable::metrics::Metrics;
55use crate::durable::objects::state_update::{
56 IntoStateUpdateKindJson, StateUpdate, StateUpdateKind, StateUpdateKindJson,
57 TryIntoStateUpdateKind,
58};
59use crate::durable::objects::{AuditLogKey, FenceToken, Snapshot};
60use crate::durable::transaction::TransactionBatch;
61use crate::durable::upgrade::upgrade;
62use crate::durable::{
63 AuditLogIterator, BootstrapArgs, CATALOG_CONTENT_VERSION_KEY, CatalogError,
64 DurableCatalogError, DurableCatalogState, Epoch, OpenableDurableCatalogState,
65 ReadOnlyDurableCatalogState, Transaction, initialize,
66};
67use crate::memory;
68
69pub(crate) type Timestamp = mz_repr::Timestamp;
71
72const MIN_EPOCH: Epoch = unsafe { Epoch::new_unchecked(1) };
77
78const CATALOG_SHARD_NAME: &str = "catalog";
80const UPGRADE_SHARD_NAME: &str = "catalog_upgrade";
82
83const CATALOG_SEED: usize = 1;
85const UPGRADE_SEED: usize = 2;
116pub const _BUILTIN_MIGRATION_SEED: usize = 3;
118pub const _EXPRESSION_CACHE_SEED: usize = 4;
120
121#[derive(Debug, Copy, Clone, Eq, PartialEq)]
123pub(crate) enum Mode {
124 Readonly,
126 Savepoint,
128 Writable,
130}
131
132#[derive(Debug)]
134pub(crate) enum FenceableToken {
135 Initializing {
138 durable_token: Option<FenceToken>,
140 current_deploy_generation: Option<u64>,
142 },
143 Unfenced { current_token: FenceToken },
145 Fenced {
147 current_token: FenceToken,
148 fence_token: FenceToken,
149 },
150}
151
152impl FenceableToken {
153 fn new(current_deploy_generation: Option<u64>) -> Self {
155 Self::Initializing {
156 durable_token: None,
157 current_deploy_generation,
158 }
159 }
160
161 fn validate(&self) -> Result<Option<FenceToken>, FenceError> {
163 match self {
164 FenceableToken::Initializing { durable_token, .. } => Ok(durable_token.clone()),
165 FenceableToken::Unfenced { current_token, .. } => Ok(Some(current_token.clone())),
166 FenceableToken::Fenced {
167 current_token,
168 fence_token,
169 } => {
170 assert!(
171 fence_token > current_token,
172 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
173 );
174 if fence_token.deploy_generation > current_token.deploy_generation {
175 Err(FenceError::DeployGeneration {
176 current_generation: current_token.deploy_generation,
177 fence_generation: fence_token.deploy_generation,
178 })
179 } else {
180 assert!(
181 fence_token.epoch > current_token.epoch,
182 "must be fenced by higher token; current={current_token:?}, fence={fence_token:?}"
183 );
184 Err(FenceError::Epoch {
185 current_epoch: current_token.epoch,
186 fence_epoch: fence_token.epoch,
187 })
188 }
189 }
190 }
191 }
192
193 fn token(&self) -> Option<FenceToken> {
195 match self {
196 FenceableToken::Initializing { durable_token, .. } => durable_token.clone(),
197 FenceableToken::Unfenced { current_token, .. } => Some(current_token.clone()),
198 FenceableToken::Fenced { current_token, .. } => Some(current_token.clone()),
199 }
200 }
201
202 fn maybe_fence(&mut self, token: FenceToken) -> Result<(), FenceError> {
204 match self {
205 FenceableToken::Initializing {
206 durable_token,
207 current_deploy_generation,
208 ..
209 } => {
210 match durable_token {
211 Some(durable_token) => {
212 *durable_token = max(durable_token.clone(), token.clone());
213 }
214 None => {
215 *durable_token = Some(token.clone());
216 }
217 }
218 if let Some(current_deploy_generation) = current_deploy_generation {
219 if *current_deploy_generation < token.deploy_generation {
220 *self = FenceableToken::Fenced {
221 current_token: FenceToken {
222 deploy_generation: *current_deploy_generation,
223 epoch: token.epoch,
224 },
225 fence_token: token,
226 };
227 self.validate()?;
228 }
229 }
230 }
231 FenceableToken::Unfenced { current_token } => {
232 if *current_token < token {
233 *self = FenceableToken::Fenced {
234 current_token: current_token.clone(),
235 fence_token: token,
236 };
237 self.validate()?;
238 }
239 }
240 FenceableToken::Fenced { .. } => {
241 self.validate()?;
242 }
243 }
244
245 Ok(())
246 }
247
248 fn generate_unfenced_token(
252 &self,
253 mode: Mode,
254 ) -> Result<Option<(Vec<(StateUpdateKind, Diff)>, FenceableToken)>, DurableCatalogError> {
255 let (durable_token, current_deploy_generation) = match self {
256 FenceableToken::Initializing {
257 durable_token,
258 current_deploy_generation,
259 } => (durable_token.clone(), current_deploy_generation.clone()),
260 FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. } => return Ok(None),
261 };
262
263 let mut fence_updates = Vec::with_capacity(2);
264
265 if let Some(durable_token) = &durable_token {
266 fence_updates.push((
267 StateUpdateKind::FenceToken(durable_token.clone()),
268 Diff::MINUS_ONE,
269 ));
270 }
271
272 let current_deploy_generation = current_deploy_generation
273 .or_else(|| durable_token.as_ref().map(|token| token.deploy_generation))
274 .ok_or(DurableCatalogError::Uninitialized)?;
276 let mut current_epoch = durable_token
277 .map(|token| token.epoch)
278 .unwrap_or(MIN_EPOCH)
279 .get();
280 if matches!(mode, Mode::Writable) {
282 current_epoch = current_epoch + 1;
283 }
284 let current_epoch = Epoch::new(current_epoch).expect("known to be non-zero");
285 let current_token = FenceToken {
286 deploy_generation: current_deploy_generation,
287 epoch: current_epoch,
288 };
289
290 fence_updates.push((
291 StateUpdateKind::FenceToken(current_token.clone()),
292 Diff::ONE,
293 ));
294
295 let current_fenceable_token = FenceableToken::Unfenced { current_token };
296
297 Ok(Some((fence_updates, current_fenceable_token)))
298 }
299}
300
301#[derive(Debug, thiserror::Error)]
303pub(crate) enum CompareAndAppendError {
304 #[error(transparent)]
305 Fence(#[from] FenceError),
306 #[error(
309 "expected catalog upper {expected_upper:?} did not match actual catalog upper {actual_upper:?}"
310 )]
311 UpperMismatch {
312 expected_upper: Timestamp,
313 actual_upper: Timestamp,
314 },
315}
316
317impl CompareAndAppendError {
318 pub(crate) fn unwrap_fence_error(self) -> FenceError {
319 match self {
320 CompareAndAppendError::Fence(e) => e,
321 e @ CompareAndAppendError::UpperMismatch { .. } => {
322 panic!("unexpected upper mismatch: {e:?}")
323 }
324 }
325 }
326}
327
328impl From<UpperMismatch<Timestamp>> for CompareAndAppendError {
329 fn from(upper_mismatch: UpperMismatch<Timestamp>) -> Self {
330 Self::UpperMismatch {
331 expected_upper: antichain_to_timestamp(upper_mismatch.expected),
332 actual_upper: antichain_to_timestamp(upper_mismatch.current),
333 }
334 }
335}
336
337pub(crate) trait ApplyUpdate<T: IntoStateUpdateKindJson> {
338 fn apply_update(
342 &mut self,
343 update: StateUpdate<T>,
344 current_fence_token: &mut FenceableToken,
345 metrics: &Arc<Metrics>,
346 ) -> Result<Option<StateUpdate<T>>, FenceError>;
347}
348
349#[derive(Debug)]
363pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
364 pub(crate) mode: Mode,
366 since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
368 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
370 listen: Listen<SourceData, (), Timestamp, StorageDiff>,
372 persist_client: PersistClient,
374 shard_id: ShardId,
376 pub(crate) snapshot: Vec<(T, Timestamp, Diff)>,
380 update_applier: U,
382 pub(crate) upper: Timestamp,
384 fenceable_token: FenceableToken,
386 catalog_content_version: semver::Version,
388 bootstrap_complete: bool,
390 metrics: Arc<Metrics>,
392}
393
394impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
395 async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
397 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
398 let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
399 .persist_client
400 .open_writer(
401 upgrade_shard_id,
402 Arc::new(UnitSchema::default()),
403 Arc::new(UnitSchema::default()),
404 Diagnostics {
405 shard_name: UPGRADE_SHARD_NAME.to_string(),
406 handle_purpose: "increment durable catalog upgrade shard version".to_string(),
407 },
408 )
409 .await
410 .expect("invalid usage");
411 const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
412 let mut upper = write_handle.fetch_recent_upper().await.clone();
413 loop {
414 let next_upper = upper
415 .iter()
416 .map(|timestamp| timestamp.step_forward())
417 .collect();
418 match write_handle
419 .compare_and_append(EMPTY_UPDATES, upper, next_upper)
420 .await
421 .expect("invalid usage")
422 {
423 Ok(()) => break,
424 Err(upper_mismatch) => {
425 upper = upper_mismatch.current;
426 }
427 }
428 }
429 }
430
431 #[mz_ore::instrument]
433 async fn current_upper(&mut self) -> Timestamp {
434 match self.mode {
435 Mode::Writable | Mode::Readonly => {
436 let upper = self.write_handle.fetch_recent_upper().await;
437 antichain_to_timestamp(upper.clone())
438 }
439 Mode::Savepoint => self.upper,
440 }
441 }
442
443 #[mz_ore::instrument]
447 pub(crate) async fn compare_and_append<S: IntoStateUpdateKindJson>(
448 &mut self,
449 updates: Vec<(S, Diff)>,
450 commit_ts: Timestamp,
451 ) -> Result<Timestamp, CompareAndAppendError> {
452 assert_eq!(self.mode, Mode::Writable);
453 assert!(
454 commit_ts >= self.upper,
455 "expected commit ts, {}, to be greater than or equal to upper, {}",
456 commit_ts,
457 self.upper
458 );
459
460 let contains_fence = if mz_ore::assert::soft_assertions_enabled() {
463 let updates: Vec<_> = updates.clone();
464 let parsed_updates: Vec<_> = updates
465 .clone()
466 .into_iter()
467 .map(|(update, diff)| {
468 let update: StateUpdateKindJson = update.into();
469 (update, diff)
470 })
471 .filter_map(|(update, diff)| {
472 <StateUpdateKindJson as TryIntoStateUpdateKind>::try_into(update)
473 .ok()
474 .map(|update| (update, diff))
475 })
476 .collect();
477 let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
478 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::MINUS_ONE
479 });
480 let contains_addition = parsed_updates.iter().any(|(update, diff)| {
481 matches!(update, StateUpdateKind::FenceToken(..)) && *diff == Diff::ONE
482 });
483 let contains_fence = contains_retraction && contains_addition;
484 Some((contains_fence, updates))
485 } else {
486 None
487 };
488
489 let updates = updates.into_iter().map(|(kind, diff)| {
490 let kind: StateUpdateKindJson = kind.into();
491 (
492 (Into::<SourceData>::into(kind), ()),
493 commit_ts,
494 diff.into_inner(),
495 )
496 });
497 let next_upper = commit_ts.step_forward();
498 let res = self
499 .write_handle
500 .compare_and_append(
501 updates,
502 Antichain::from_elem(self.upper),
503 Antichain::from_elem(next_upper),
504 )
505 .await
506 .expect("invalid usage");
507
508 if let Err(e @ UpperMismatch { .. }) = res {
513 self.sync_to_current_upper().await?;
514 if let Some((contains_fence, updates)) = contains_fence {
515 assert!(
516 contains_fence,
517 "updates were neither fenced nor fencing and encountered an upper mismatch: {updates:#?}"
518 )
519 }
520 return Err(e.into());
521 }
522
523 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
525
526 let opaque = *self.since_handle.opaque();
531 let downgrade = self
532 .since_handle
533 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
534 .await;
535
536 match downgrade {
537 None => {}
538 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
539 Some(Ok(updated)) => soft_assert_or_log!(
540 updated == downgrade_to,
541 "updated bound should match expected"
542 ),
543 }
544 self.sync(next_upper).await?;
545 Ok(next_upper)
546 }
547
548 #[mz_ore::instrument]
551 async fn snapshot_unconsolidated(&mut self) -> Vec<StateUpdate<StateUpdateKind>> {
552 let current_upper = self.current_upper().await;
553
554 let mut snapshot = Vec::new();
555 let mut read_handle = self.read_handle().await;
556 let as_of = as_of(&read_handle, current_upper);
557 let mut stream = Box::pin(
558 read_handle
560 .snapshot_and_stream(Antichain::from_elem(as_of))
561 .await
562 .expect("we have advanced the restart_as_of by the since"),
563 );
564 while let Some(update) = stream.next().await {
565 snapshot.push(update)
566 }
567 read_handle.expire().await;
568 snapshot
569 .into_iter()
570 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
571 .map(|state_update| state_update.try_into().expect("kind decoding error"))
572 .collect()
573 }
574
575 #[mz_ore::instrument]
579 pub(crate) async fn sync_to_current_upper(&mut self) -> Result<(), FenceError> {
580 let upper = self.current_upper().await;
581 self.sync(upper).await
582 }
583
584 #[mz_ore::instrument(level = "debug")]
588 pub(crate) async fn sync(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
589 self.metrics.syncs.inc();
590 let counter = self.metrics.sync_latency_seconds.clone();
591 self.sync_inner(target_upper)
592 .wall_time()
593 .inc_by(counter)
594 .await
595 }
596
597 #[mz_ore::instrument(level = "debug")]
598 async fn sync_inner(&mut self, target_upper: Timestamp) -> Result<(), FenceError> {
599 self.fenceable_token.validate()?;
600
601 if self.mode == Mode::Savepoint {
604 self.upper = max(self.upper, target_upper);
605 return Ok(());
606 }
607
608 let mut updates: BTreeMap<_, Vec<_>> = BTreeMap::new();
609
610 while self.upper < target_upper {
611 let listen_events = self.listen.fetch_next().await;
612 for listen_event in listen_events {
613 match listen_event {
614 ListenEvent::Progress(upper) => {
615 debug!("synced up to {upper:?}");
616 self.upper = antichain_to_timestamp(upper);
617 while let Some((ts, updates)) = updates.pop_first() {
622 assert!(ts < self.upper, "expected {} < {}", ts, self.upper);
623 let updates = updates.into_iter().map(
624 |update: StateUpdate<StateUpdateKindJson>| {
625 let kind =
626 T::try_from(update.kind).expect("kind decoding error");
627 StateUpdate {
628 kind,
629 ts: update.ts,
630 diff: update.diff,
631 }
632 },
633 );
634 self.apply_updates(updates)?;
635 }
636 }
637 ListenEvent::Updates(batch_updates) => {
638 for update in batch_updates {
639 let update: StateUpdate<StateUpdateKindJson> = update.into();
640 updates.entry(update.ts).or_default().push(update);
641 }
642 }
643 }
644 }
645 }
646 assert_eq!(updates, BTreeMap::new(), "all updates should be applied");
647 Ok(())
648 }
649
650 #[mz_ore::instrument(level = "debug")]
651 pub(crate) fn apply_updates(
652 &mut self,
653 updates: impl IntoIterator<Item = StateUpdate<T>>,
654 ) -> Result<(), FenceError> {
655 let mut updates: Vec<_> = updates
656 .into_iter()
657 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
658 .collect();
659
660 differential_dataflow::consolidation::consolidate_updates(&mut updates);
664
665 updates.sort_by(|(_, ts1, diff1), (_, ts2, diff2)| ts1.cmp(ts2).then(diff1.cmp(diff2)));
668
669 let mut errors = Vec::new();
670
671 for (kind, ts, diff) in updates {
672 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
673 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
674 }
675
676 match self.update_applier.apply_update(
677 StateUpdate { kind, ts, diff },
678 &mut self.fenceable_token,
679 &self.metrics,
680 ) {
681 Ok(Some(StateUpdate { kind, ts, diff })) => self.snapshot.push((kind, ts, diff)),
682 Ok(None) => {}
683 Err(err) => errors.push(err),
686 }
687 }
688
689 errors.sort();
690 if let Some(err) = errors.into_iter().next() {
691 return Err(err);
692 }
693
694 self.consolidate();
695
696 Ok(())
697 }
698
699 #[mz_ore::instrument]
700 pub(crate) fn consolidate(&mut self) {
701 soft_assert_no_log!(
702 self.snapshot
703 .windows(2)
704 .all(|updates| updates[0].1 <= updates[1].1),
705 "snapshot should be sorted by timestamp, {:#?}",
706 self.snapshot
707 );
708
709 let new_ts = self
710 .snapshot
711 .last()
712 .map(|(_, ts, _)| *ts)
713 .unwrap_or_else(Timestamp::minimum);
714 for (_, ts, _) in &mut self.snapshot {
715 *ts = new_ts;
716 }
717 differential_dataflow::consolidation::consolidate_updates(&mut self.snapshot);
718 }
719
720 async fn with_trace<R>(
724 &mut self,
725 f: impl FnOnce(&Vec<(T, Timestamp, Diff)>) -> Result<R, CatalogError>,
726 ) -> Result<R, CatalogError> {
727 self.sync_to_current_upper().await?;
728 f(&self.snapshot)
729 }
730
731 async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
733 self.persist_client
734 .open_leased_reader(
735 self.shard_id,
736 Arc::new(desc()),
737 Arc::new(UnitSchema::default()),
738 Diagnostics {
739 shard_name: CATALOG_SHARD_NAME.to_string(),
740 handle_purpose: "openable durable catalog state temporary reader".to_string(),
741 },
742 USE_CRITICAL_SINCE_CATALOG.get(self.persist_client.dyncfgs()),
743 )
744 .await
745 .expect("invalid usage")
746 }
747
748 async fn expire(self: Box<Self>) {
750 self.write_handle.expire().await;
751 self.listen.expire().await;
752 }
753}
754
755impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
756 async fn with_snapshot<T>(
760 &mut self,
761 f: impl FnOnce(Snapshot) -> Result<T, CatalogError>,
762 ) -> Result<T, CatalogError> {
763 fn apply<K, V>(map: &mut BTreeMap<K, V>, key: &K, value: &V, diff: Diff)
764 where
765 K: Ord + Clone,
766 V: Ord + Clone + Debug,
767 {
768 let key = key.clone();
769 let value = value.clone();
770 if diff == Diff::ONE {
771 let prev = map.insert(key, value);
772 assert_eq!(
773 prev, None,
774 "values must be explicitly retracted before inserting a new value"
775 );
776 } else if diff == Diff::MINUS_ONE {
777 let prev = map.remove(&key);
778 assert_eq!(
779 prev,
780 Some(value),
781 "retraction does not match existing value"
782 );
783 }
784 }
785
786 self.with_trace(|trace| {
787 let mut snapshot = Snapshot::empty();
788 for (kind, ts, diff) in trace {
789 let diff = *diff;
790 if diff != Diff::ONE && diff != Diff::MINUS_ONE {
791 panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
792 }
793
794 match kind {
795 StateUpdateKind::AuditLog(_key, ()) => {
796 }
798 StateUpdateKind::Cluster(key, value) => {
799 apply(&mut snapshot.clusters, key, value, diff);
800 }
801 StateUpdateKind::ClusterReplica(key, value) => {
802 apply(&mut snapshot.cluster_replicas, key, value, diff);
803 }
804 StateUpdateKind::Comment(key, value) => {
805 apply(&mut snapshot.comments, key, value, diff);
806 }
807 StateUpdateKind::Config(key, value) => {
808 apply(&mut snapshot.configs, key, value, diff);
809 }
810 StateUpdateKind::Database(key, value) => {
811 apply(&mut snapshot.databases, key, value, diff);
812 }
813 StateUpdateKind::DefaultPrivilege(key, value) => {
814 apply(&mut snapshot.default_privileges, key, value, diff);
815 }
816 StateUpdateKind::FenceToken(_token) => {
817 }
819 StateUpdateKind::IdAllocator(key, value) => {
820 apply(&mut snapshot.id_allocator, key, value, diff);
821 }
822 StateUpdateKind::IntrospectionSourceIndex(key, value) => {
823 apply(&mut snapshot.introspection_sources, key, value, diff);
824 }
825 StateUpdateKind::Item(key, value) => {
826 apply(&mut snapshot.items, key, value, diff);
827 }
828 StateUpdateKind::NetworkPolicy(key, value) => {
829 apply(&mut snapshot.network_policies, key, value, diff);
830 }
831 StateUpdateKind::Role(key, value) => {
832 apply(&mut snapshot.roles, key, value, diff);
833 }
834 StateUpdateKind::Schema(key, value) => {
835 apply(&mut snapshot.schemas, key, value, diff);
836 }
837 StateUpdateKind::Setting(key, value) => {
838 apply(&mut snapshot.settings, key, value, diff);
839 }
840 StateUpdateKind::SourceReferences(key, value) => {
841 apply(&mut snapshot.source_references, key, value, diff);
842 }
843 StateUpdateKind::SystemConfiguration(key, value) => {
844 apply(&mut snapshot.system_configurations, key, value, diff);
845 }
846 StateUpdateKind::SystemObjectMapping(key, value) => {
847 apply(&mut snapshot.system_object_mappings, key, value, diff);
848 }
849 StateUpdateKind::SystemPrivilege(key, value) => {
850 apply(&mut snapshot.system_privileges, key, value, diff);
851 }
852 StateUpdateKind::StorageCollectionMetadata(key, value) => {
853 apply(&mut snapshot.storage_collection_metadata, key, value, diff);
854 }
855 StateUpdateKind::UnfinalizedShard(key, ()) => {
856 apply(&mut snapshot.unfinalized_shards, key, &(), diff);
857 }
858 StateUpdateKind::TxnWalShard((), value) => {
859 apply(&mut snapshot.txn_wal_shard, &(), value, diff);
860 }
861 StateUpdateKind::RoleAuth(key, value) => {
862 apply(&mut snapshot.role_auth, key, value, diff);
863 }
864 }
865 }
866 f(snapshot)
867 })
868 .await
869 }
870
871 #[mz_ore::instrument(level = "debug")]
878 async fn persist_snapshot(
879 &mut self,
880 ) -> impl Iterator<Item = StateUpdate> + DoubleEndedIterator {
881 let mut read_handle = self.read_handle().await;
882 let as_of = as_of(&read_handle, self.upper);
883 let snapshot = snapshot_binary(&mut read_handle, as_of, &self.metrics)
884 .await
885 .map(|update| update.try_into().expect("kind decoding error"));
886 read_handle.expire().await;
887 snapshot
888 }
889}
890
891#[derive(Debug)]
893pub(crate) struct UnopenedCatalogStateInner {
894 organization_id: Uuid,
896 configs: BTreeMap<String, u64>,
898 settings: BTreeMap<String, String>,
900}
901
902impl UnopenedCatalogStateInner {
903 fn new(organization_id: Uuid) -> UnopenedCatalogStateInner {
904 UnopenedCatalogStateInner {
905 organization_id,
906 configs: BTreeMap::new(),
907 settings: BTreeMap::new(),
908 }
909 }
910}
911
912impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
913 fn apply_update(
914 &mut self,
915 update: StateUpdate<StateUpdateKindJson>,
916 current_fence_token: &mut FenceableToken,
917 _metrics: &Arc<Metrics>,
918 ) -> Result<Option<StateUpdate<StateUpdateKindJson>>, FenceError> {
919 if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
920 let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
921 match (kind, update.diff) {
922 (StateUpdateKind::Config(key, value), Diff::ONE) => {
923 let prev = self.configs.insert(key.key, value.value);
924 assert_eq!(
925 prev, None,
926 "values must be explicitly retracted before inserting a new value"
927 );
928 }
929 (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
930 let prev = self.configs.remove(&key.key);
931 assert_eq!(
932 prev,
933 Some(value.value),
934 "retraction does not match existing value"
935 );
936 }
937 (StateUpdateKind::Setting(key, value), Diff::ONE) => {
938 let prev = self.settings.insert(key.name, value.value);
939 assert_eq!(
940 prev, None,
941 "values must be explicitly retracted before inserting a new value"
942 );
943 }
944 (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
945 let prev = self.settings.remove(&key.name);
946 assert_eq!(
947 prev,
948 Some(value.value),
949 "retraction does not match existing value"
950 );
951 }
952 (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
953 current_fence_token.maybe_fence(fence_token)?;
954 }
955 _ => {}
956 }
957 }
958
959 Ok(Some(update))
960 }
961}
962
963pub(crate) type UnopenedPersistCatalogState =
971 PersistHandle<StateUpdateKindJson, UnopenedCatalogStateInner>;
972
973impl UnopenedPersistCatalogState {
974 #[mz_ore::instrument]
980 pub(crate) async fn new(
981 persist_client: PersistClient,
982 organization_id: Uuid,
983 version: semver::Version,
984 deploy_generation: Option<u64>,
985 metrics: Arc<Metrics>,
986 ) -> Result<UnopenedPersistCatalogState, DurableCatalogError> {
987 let catalog_shard_id = shard_id(organization_id, CATALOG_SEED);
988 let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
989 debug!(
990 ?catalog_shard_id,
991 ?upgrade_shard_id,
992 "new persist backed catalog state"
993 );
994
995 let version_in_upgrade_shard =
997 fetch_catalog_upgrade_shard_version(&persist_client, upgrade_shard_id).await;
998 if let Some(version_in_upgrade_shard) = version_in_upgrade_shard {
1001 if mz_persist_client::cfg::check_data_version(&version_in_upgrade_shard, &version)
1009 .is_err()
1010 {
1011 return Err(DurableCatalogError::IncompatiblePersistVersion {
1012 found_version: version_in_upgrade_shard,
1013 catalog_version: version,
1014 });
1015 }
1016 }
1017
1018 let open_handles_start = Instant::now();
1019 info!("startup: envd serve: catalog init: open handles beginning");
1020 let since_handle = persist_client
1021 .open_critical_since(
1022 catalog_shard_id,
1023 PersistClient::CONTROLLER_CRITICAL_SINCE,
1026 Diagnostics {
1027 shard_name: CATALOG_SHARD_NAME.to_string(),
1028 handle_purpose: "durable catalog state critical since".to_string(),
1029 },
1030 )
1031 .await
1032 .expect("invalid usage");
1033 let (mut write_handle, mut read_handle) = persist_client
1034 .open(
1035 catalog_shard_id,
1036 Arc::new(desc()),
1037 Arc::new(UnitSchema::default()),
1038 Diagnostics {
1039 shard_name: CATALOG_SHARD_NAME.to_string(),
1040 handle_purpose: "durable catalog state handles".to_string(),
1041 },
1042 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
1043 )
1044 .await
1045 .expect("invalid usage");
1046 info!(
1047 "startup: envd serve: catalog init: open handles complete in {:?}",
1048 open_handles_start.elapsed()
1049 );
1050
1051 let upper = {
1053 const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1054 let upper = Antichain::from_elem(Timestamp::minimum());
1055 let next_upper = Timestamp::minimum().step_forward();
1056 match write_handle
1057 .compare_and_append(EMPTY_UPDATES, upper, Antichain::from_elem(next_upper))
1058 .await
1059 .expect("invalid usage")
1060 {
1061 Ok(()) => next_upper,
1062 Err(mismatch) => antichain_to_timestamp(mismatch.current),
1063 }
1064 };
1065
1066 let snapshot_start = Instant::now();
1067 info!("startup: envd serve: catalog init: snapshot beginning");
1068 let as_of = as_of(&read_handle, upper);
1069 let snapshot: Vec<_> = snapshot_binary(&mut read_handle, as_of, &metrics)
1070 .await
1071 .map(|StateUpdate { kind, ts, diff }| (kind, ts, diff))
1072 .collect();
1073 let listen = read_handle
1074 .listen(Antichain::from_elem(as_of))
1075 .await
1076 .expect("invalid usage");
1077 info!(
1078 "startup: envd serve: catalog init: snapshot complete in {:?}",
1079 snapshot_start.elapsed()
1080 );
1081
1082 let mut handle = UnopenedPersistCatalogState {
1083 mode: Mode::Writable,
1085 since_handle,
1086 write_handle,
1087 listen,
1088 persist_client,
1089 shard_id: catalog_shard_id,
1090 snapshot: Vec::new(),
1092 update_applier: UnopenedCatalogStateInner::new(organization_id),
1093 upper,
1094 fenceable_token: FenceableToken::new(deploy_generation),
1095 catalog_content_version: version,
1096 bootstrap_complete: false,
1097 metrics,
1098 };
1099 soft_assert_no_log!(
1102 snapshot.iter().all(|(_, _, diff)| *diff == Diff::ONE),
1103 "snapshot should be consolidated: {snapshot:#?}"
1104 );
1105
1106 let apply_start = Instant::now();
1107 info!("startup: envd serve: catalog init: apply updates beginning");
1108 let updates = snapshot
1109 .into_iter()
1110 .map(|(kind, ts, diff)| StateUpdate { kind, ts, diff });
1111 handle.apply_updates(updates)?;
1112 info!(
1113 "startup: envd serve: catalog init: apply updates complete in {:?}",
1114 apply_start.elapsed()
1115 );
1116
1117 if let Some(found_version) = handle.get_catalog_content_version().await? {
1123 if handle.catalog_content_version < found_version {
1124 return Err(DurableCatalogError::IncompatiblePersistVersion {
1125 found_version,
1126 catalog_version: handle.catalog_content_version,
1127 });
1128 }
1129 }
1130
1131 Ok(handle)
1132 }
1133
1134 #[mz_ore::instrument]
1135 async fn open_inner(
1136 mut self,
1137 mode: Mode,
1138 initial_ts: Timestamp,
1139 bootstrap_args: &BootstrapArgs,
1140 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1141 let mut commit_ts = self.upper;
1144 self.mode = mode;
1145
1146 match (&self.mode, &self.fenceable_token) {
1148 (_, FenceableToken::Unfenced { .. } | FenceableToken::Fenced { .. }) => {
1149 return Err(DurableCatalogError::Internal(
1150 "catalog should not have fenced before opening".to_string(),
1151 )
1152 .into());
1153 }
1154 (
1155 Mode::Writable | Mode::Savepoint,
1156 FenceableToken::Initializing {
1157 current_deploy_generation: None,
1158 ..
1159 },
1160 ) => {
1161 return Err(DurableCatalogError::Internal(format!(
1162 "cannot open in mode '{:?}' without a deploy generation",
1163 self.mode,
1164 ))
1165 .into());
1166 }
1167 _ => {}
1168 }
1169
1170 let read_only = matches!(self.mode, Mode::Readonly);
1171
1172 loop {
1174 self.sync_to_current_upper().await?;
1175 commit_ts = max(commit_ts, self.upper);
1176 let (fence_updates, current_fenceable_token) = self
1177 .fenceable_token
1178 .generate_unfenced_token(self.mode)?
1179 .ok_or_else(|| {
1180 DurableCatalogError::Internal(
1181 "catalog should not have fenced before opening".to_string(),
1182 )
1183 })?;
1184 debug!(
1185 ?self.upper,
1186 ?self.fenceable_token,
1187 ?current_fenceable_token,
1188 "fencing previous catalogs"
1189 );
1190 if matches!(self.mode, Mode::Writable) {
1191 match self
1192 .compare_and_append(fence_updates.clone(), commit_ts)
1193 .await
1194 {
1195 Ok(upper) => {
1196 commit_ts = upper;
1197 }
1198 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
1199 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
1200 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
1201 continue;
1202 }
1203 }
1204 }
1205 self.fenceable_token = current_fenceable_token;
1206 break;
1207 }
1208
1209 let is_initialized = self.is_initialized_inner();
1210 if !matches!(self.mode, Mode::Writable) && !is_initialized {
1211 return Err(CatalogError::Durable(DurableCatalogError::NotWritable(
1212 format!(
1213 "catalog tables do not exist; will not create in {:?} mode",
1214 self.mode
1215 ),
1216 )));
1217 }
1218 soft_assert_ne_or_log!(self.upper, Timestamp::minimum());
1219
1220 let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
1222 .snapshot
1223 .into_iter()
1224 .partition(|(update, _, _)| update.is_audit_log());
1225 self.snapshot = snapshot;
1226 let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
1227 let audit_log_handle = AuditLogIterator::new(audit_logs);
1228
1229 if is_initialized && !read_only {
1231 commit_ts = upgrade(&mut self, commit_ts).await?;
1232 }
1233
1234 debug!(
1235 ?is_initialized,
1236 ?self.upper,
1237 "initializing catalog state"
1238 );
1239 let mut catalog = PersistCatalogState {
1240 mode: self.mode,
1241 since_handle: self.since_handle,
1242 write_handle: self.write_handle,
1243 listen: self.listen,
1244 persist_client: self.persist_client,
1245 shard_id: self.shard_id,
1246 upper: self.upper,
1247 fenceable_token: self.fenceable_token,
1248 snapshot: Vec::new(),
1250 update_applier: CatalogStateInner::new(),
1251 catalog_content_version: self.catalog_content_version,
1252 bootstrap_complete: false,
1253 metrics: self.metrics,
1254 };
1255 catalog.metrics.collection_entries.reset();
1256 catalog
1259 .metrics
1260 .collection_entries
1261 .with_label_values(&[&CollectionType::AuditLog.to_string()])
1262 .add(audit_log_count.into_inner());
1263 let updates = self.snapshot.into_iter().map(|(kind, ts, diff)| {
1264 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
1265 StateUpdate { kind, ts, diff }
1266 });
1267 catalog.apply_updates(updates)?;
1268
1269 let catalog_content_version = catalog.catalog_content_version.to_string();
1270 let txn = if is_initialized {
1271 let mut txn = catalog.transaction().await?;
1272 txn.set_catalog_content_version(catalog_content_version)?;
1273 txn
1274 } else {
1275 soft_assert_eq_no_log!(
1276 catalog
1277 .snapshot
1278 .iter()
1279 .filter(|(kind, _, _)| !matches!(kind, StateUpdateKind::FenceToken(_)))
1280 .count(),
1281 0,
1282 "trace should not contain any updates for an uninitialized catalog: {:#?}",
1283 catalog.snapshot
1284 );
1285
1286 let mut txn = catalog.transaction().await?;
1287 initialize::initialize(
1288 &mut txn,
1289 bootstrap_args,
1290 initial_ts.into(),
1291 catalog_content_version,
1292 )
1293 .await?;
1294 txn
1295 };
1296
1297 if read_only {
1298 let (txn_batch, _) = txn.into_parts();
1299 let updates = StateUpdate::from_txn_batch_ts(txn_batch, catalog.upper);
1301 catalog.apply_updates(updates)?;
1302 } else {
1303 txn.commit_internal(commit_ts).await?;
1304 }
1305
1306 if matches!(catalog.mode, Mode::Writable) {
1310 catalog
1311 .increment_catalog_upgrade_shard_version(self.update_applier.organization_id)
1312 .await;
1313 let write_handle = catalog
1314 .persist_client
1315 .open_writer::<SourceData, (), Timestamp, i64>(
1316 catalog.write_handle.shard_id(),
1317 Arc::new(desc()),
1318 Arc::new(UnitSchema::default()),
1319 Diagnostics {
1320 shard_name: CATALOG_SHARD_NAME.to_string(),
1321 handle_purpose: "compact catalog".to_string(),
1322 },
1323 )
1324 .await
1325 .expect("invalid usage");
1326 let fuel = CATALOG_FORCE_COMPACTION_FUEL.handle(catalog.persist_client.dyncfgs());
1327 let wait = CATALOG_FORCE_COMPACTION_WAIT.handle(catalog.persist_client.dyncfgs());
1328 let _task = mz_ore::task::spawn(|| "catalog::force_shard_compaction", async move {
1331 let () =
1332 mz_persist_client::cli::admin::dangerous_force_compaction_and_break_pushdown(
1333 &write_handle,
1334 || fuel.get(),
1335 || wait.get(),
1336 )
1337 .await;
1338 });
1339 }
1340
1341 Ok((Box::new(catalog), audit_log_handle))
1342 }
1343
1344 #[mz_ore::instrument]
1349 fn is_initialized_inner(&self) -> bool {
1350 !self.update_applier.configs.is_empty()
1351 }
1352
1353 #[mz_ore::instrument]
1357 async fn get_current_config(&mut self, key: &str) -> Result<Option<u64>, DurableCatalogError> {
1358 self.sync_to_current_upper().await?;
1359 Ok(self.update_applier.configs.get(key).cloned())
1360 }
1361
1362 #[mz_ore::instrument]
1366 pub(crate) async fn get_user_version(&mut self) -> Result<Option<u64>, DurableCatalogError> {
1367 self.get_current_config(USER_VERSION_KEY).await
1368 }
1369
1370 #[mz_ore::instrument]
1374 async fn get_current_setting(
1375 &mut self,
1376 name: &str,
1377 ) -> Result<Option<String>, DurableCatalogError> {
1378 self.sync_to_current_upper().await?;
1379 Ok(self.update_applier.settings.get(name).cloned())
1380 }
1381
1382 #[mz_ore::instrument]
1387 async fn get_catalog_content_version(
1388 &mut self,
1389 ) -> Result<Option<semver::Version>, DurableCatalogError> {
1390 let version = self
1391 .get_current_setting(CATALOG_CONTENT_VERSION_KEY)
1392 .await?;
1393 let version = version.map(|version| version.parse().expect("invalid version persisted"));
1394 Ok(version)
1395 }
1396}
1397
1398#[async_trait]
1399impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
1400 #[mz_ore::instrument]
1401 async fn open_savepoint(
1402 mut self: Box<Self>,
1403 initial_ts: Timestamp,
1404 bootstrap_args: &BootstrapArgs,
1405 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1406 self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
1407 .boxed()
1408 .await
1409 }
1410
1411 #[mz_ore::instrument]
1412 async fn open_read_only(
1413 mut self: Box<Self>,
1414 bootstrap_args: &BootstrapArgs,
1415 ) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
1416 self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
1417 .boxed()
1418 .await
1419 .map(|(catalog, _)| catalog)
1420 }
1421
1422 #[mz_ore::instrument]
1423 async fn open(
1424 mut self: Box<Self>,
1425 initial_ts: Timestamp,
1426 bootstrap_args: &BootstrapArgs,
1427 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError> {
1428 self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
1429 .boxed()
1430 .await
1431 }
1432
1433 #[mz_ore::instrument(level = "debug")]
1434 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError> {
1435 Ok(DebugCatalogState(*self))
1436 }
1437
1438 #[mz_ore::instrument]
1439 async fn is_initialized(&mut self) -> Result<bool, CatalogError> {
1440 self.sync_to_current_upper().await?;
1441 Ok(self.is_initialized_inner())
1442 }
1443
1444 #[mz_ore::instrument]
1445 async fn epoch(&mut self) -> Result<Epoch, CatalogError> {
1446 self.sync_to_current_upper().await?;
1447 self.fenceable_token
1448 .validate()?
1449 .map(|token| token.epoch)
1450 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1451 }
1452
1453 #[mz_ore::instrument]
1454 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1455 self.sync_to_current_upper().await?;
1456 self.fenceable_token
1457 .token()
1458 .map(|token| token.deploy_generation)
1459 .ok_or(CatalogError::Durable(DurableCatalogError::Uninitialized))
1460 }
1461
1462 #[mz_ore::instrument(level = "debug")]
1463 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError> {
1464 let value = self
1465 .get_current_config(WITH_0DT_DEPLOYMENT_MAX_WAIT)
1466 .await?;
1467 match value {
1468 None => Ok(None),
1469 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1470 }
1471 }
1472
1473 #[mz_ore::instrument(level = "debug")]
1474 async fn get_0dt_deployment_ddl_check_interval(
1475 &mut self,
1476 ) -> Result<Option<Duration>, CatalogError> {
1477 let value = self
1478 .get_current_config(WITH_0DT_DEPLOYMENT_DDL_CHECK_INTERVAL)
1479 .await?;
1480 match value {
1481 None => Ok(None),
1482 Some(millis) => Ok(Some(Duration::from_millis(millis))),
1483 }
1484 }
1485
1486 #[mz_ore::instrument(level = "debug")]
1487 async fn get_enable_0dt_deployment_panic_after_timeout(
1488 &mut self,
1489 ) -> Result<Option<bool>, CatalogError> {
1490 let value = self
1491 .get_current_config(ENABLE_0DT_DEPLOYMENT_PANIC_AFTER_TIMEOUT)
1492 .await?;
1493 match value {
1494 None => Ok(None),
1495 Some(0) => Ok(Some(false)),
1496 Some(1) => Ok(Some(true)),
1497 Some(v) => Err(
1498 DurableCatalogError::from(TryFromProtoError::UnknownEnumVariant(format!(
1499 "{v} is not a valid boolean value"
1500 )))
1501 .into(),
1502 ),
1503 }
1504 }
1505
1506 #[mz_ore::instrument]
1507 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError> {
1508 self.get_current_config(SYSTEM_CONFIG_SYNCED_KEY)
1509 .await
1510 .map(|value| value.map(|value| value > 0).unwrap_or(false))
1511 }
1512
1513 #[mz_ore::instrument]
1514 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError> {
1515 self.sync_to_current_upper().await?;
1516 if self.is_initialized_inner() {
1517 let snapshot = self.snapshot_unconsolidated().await;
1518 Ok(Trace::from_snapshot(snapshot))
1519 } else {
1520 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1521 }
1522 }
1523
1524 #[mz_ore::instrument]
1525 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError> {
1526 self.sync_to_current_upper().await?;
1527 if self.is_initialized_inner() {
1528 let snapshot = self.current_snapshot().await?;
1529 Ok(Trace::from_snapshot(snapshot))
1530 } else {
1531 Err(CatalogError::Durable(DurableCatalogError::Uninitialized))
1532 }
1533 }
1534
1535 #[mz_ore::instrument(level = "debug")]
1536 async fn expire(self: Box<Self>) {
1537 self.expire().await
1538 }
1539}
1540
1541#[derive(Debug)]
1543struct CatalogStateInner {
1544 updates: VecDeque<memory::objects::StateUpdate>,
1546}
1547
1548impl CatalogStateInner {
1549 fn new() -> CatalogStateInner {
1550 CatalogStateInner {
1551 updates: VecDeque::new(),
1552 }
1553 }
1554}
1555
1556impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
1557 fn apply_update(
1558 &mut self,
1559 update: StateUpdate<StateUpdateKind>,
1560 current_fence_token: &mut FenceableToken,
1561 metrics: &Arc<Metrics>,
1562 ) -> Result<Option<StateUpdate<StateUpdateKind>>, FenceError> {
1563 if let Some(collection_type) = update.kind.collection_type() {
1564 metrics
1565 .collection_entries
1566 .with_label_values(&[&collection_type.to_string()])
1567 .add(update.diff.into_inner());
1568 }
1569
1570 {
1571 let update: Option<memory::objects::StateUpdate> = (&update)
1572 .try_into()
1573 .expect("invalid persisted update: {update:#?}");
1574 if let Some(update) = update {
1575 self.updates.push_back(update);
1576 }
1577 }
1578
1579 match (update.kind, update.diff) {
1580 (StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
1581 (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
1583 (StateUpdateKind::FenceToken(token), Diff::ONE) => {
1584 current_fence_token.maybe_fence(token)?;
1585 Ok(None)
1586 }
1587 (kind, diff) => Ok(Some(StateUpdate {
1588 kind,
1589 ts: update.ts,
1590 diff,
1591 })),
1592 }
1593 }
1594}
1595
1596type PersistCatalogState = PersistHandle<StateUpdateKind, CatalogStateInner>;
1602
1603#[async_trait]
1604impl ReadOnlyDurableCatalogState for PersistCatalogState {
1605 fn epoch(&self) -> Epoch {
1606 self.fenceable_token
1607 .token()
1608 .expect("opened catalog state must have an epoch")
1609 .epoch
1610 }
1611
1612 #[mz_ore::instrument(level = "debug")]
1613 async fn expire(self: Box<Self>) {
1614 self.expire().await
1615 }
1616
1617 fn is_bootstrap_complete(&self) -> bool {
1618 self.bootstrap_complete
1619 }
1620
1621 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError> {
1622 self.sync_to_current_upper().await?;
1623 let audit_logs: Vec<_> = self
1624 .persist_snapshot()
1625 .await
1626 .filter_map(
1627 |StateUpdate {
1628 kind,
1629 ts: _,
1630 diff: _,
1631 }| match kind {
1632 StateUpdateKind::AuditLog(key, ()) => Some(key),
1633 _ => None,
1634 },
1635 )
1636 .collect();
1637 let mut audit_logs: Vec<_> = audit_logs
1638 .into_iter()
1639 .map(RustType::from_proto)
1640 .map_ok(|key: AuditLogKey| key.event)
1641 .collect::<Result<_, _>>()?;
1642 audit_logs.sort_by(|a, b| a.sortable_id().cmp(&b.sortable_id()));
1643 Ok(audit_logs)
1644 }
1645
1646 #[mz_ore::instrument(level = "debug")]
1647 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError> {
1648 self.with_trace(|trace| {
1649 Ok(trace
1650 .into_iter()
1651 .rev()
1652 .filter_map(|(kind, _, _)| match kind {
1653 StateUpdateKind::IdAllocator(key, value) if key.name == id_type => {
1654 Some(value.next_id)
1655 }
1656 _ => None,
1657 })
1658 .next()
1659 .expect("must exist"))
1660 })
1661 .await
1662 }
1663
1664 #[mz_ore::instrument(level = "debug")]
1665 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError> {
1666 self.sync_to_current_upper().await?;
1667 Ok(self
1668 .fenceable_token
1669 .token()
1670 .expect("opened catalogs must have a token")
1671 .deploy_generation)
1672 }
1673
1674 #[mz_ore::instrument(level = "debug")]
1675 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError> {
1676 self.with_snapshot(Ok).await
1677 }
1678
1679 #[mz_ore::instrument(level = "debug")]
1680 async fn sync_to_current_updates(
1681 &mut self,
1682 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1683 let upper = self.current_upper().await;
1684 self.sync_updates(upper).await
1685 }
1686
1687 #[mz_ore::instrument(level = "debug")]
1688 async fn sync_updates(
1689 &mut self,
1690 target_upper: mz_repr::Timestamp,
1691 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError> {
1692 self.sync(target_upper).await?;
1693 let mut updates = Vec::new();
1694 while let Some(update) = self.update_applier.updates.front() {
1695 if update.ts >= target_upper {
1696 break;
1697 }
1698
1699 let update = self
1700 .update_applier
1701 .updates
1702 .pop_front()
1703 .expect("peeked above");
1704 updates.push(update);
1705 }
1706 Ok(updates)
1707 }
1708
1709 async fn current_upper(&mut self) -> Timestamp {
1710 self.current_upper().await
1711 }
1712}
1713
1714#[async_trait]
1715#[allow(mismatched_lifetime_syntaxes)]
1716impl DurableCatalogState for PersistCatalogState {
1717 fn is_read_only(&self) -> bool {
1718 matches!(self.mode, Mode::Readonly)
1719 }
1720
1721 fn is_savepoint(&self) -> bool {
1722 matches!(self.mode, Mode::Savepoint)
1723 }
1724
1725 fn mark_bootstrap_complete(&mut self) {
1726 self.bootstrap_complete = true;
1727 }
1728
1729 #[mz_ore::instrument(level = "debug")]
1730 async fn transaction(&mut self) -> Result<Transaction, CatalogError> {
1731 self.metrics.transactions_started.inc();
1732 let snapshot = self.snapshot().await?;
1733 let commit_ts = self.upper.clone();
1734 Transaction::new(self, snapshot, commit_ts)
1735 }
1736
1737 #[mz_ore::instrument(level = "debug")]
1738 async fn commit_transaction(
1739 &mut self,
1740 txn_batch: TransactionBatch,
1741 commit_ts: Timestamp,
1742 ) -> Result<Timestamp, CatalogError> {
1743 async fn commit_transaction_inner(
1744 catalog: &mut PersistCatalogState,
1745 txn_batch: TransactionBatch,
1746 commit_ts: Timestamp,
1747 ) -> Result<Timestamp, CatalogError> {
1748 assert_eq!(
1753 catalog.upper, txn_batch.upper,
1754 "only one transaction at a time is supported"
1755 );
1756
1757 assert!(
1758 commit_ts >= catalog.upper,
1759 "expected commit ts, {}, to be greater than or equal to upper, {}",
1760 commit_ts,
1761 catalog.upper
1762 );
1763
1764 let updates = StateUpdate::from_txn_batch(txn_batch).collect();
1765 debug!("committing updates: {updates:?}");
1766
1767 let next_upper = match catalog.mode {
1768 Mode::Writable => catalog
1769 .compare_and_append(updates, commit_ts)
1770 .await
1771 .map_err(|e| e.unwrap_fence_error())?,
1772 Mode::Savepoint => {
1773 let updates = updates.into_iter().map(|(kind, diff)| StateUpdate {
1774 kind,
1775 ts: commit_ts,
1776 diff,
1777 });
1778 catalog.apply_updates(updates)?;
1779 catalog.upper = commit_ts.step_forward();
1780 catalog.upper
1781 }
1782 Mode::Readonly => {
1783 if !updates.is_empty() {
1787 return Err(DurableCatalogError::NotWritable(format!(
1788 "cannot commit a transaction in a read-only catalog: {updates:#?}"
1789 ))
1790 .into());
1791 }
1792 catalog.upper
1793 }
1794 };
1795
1796 Ok(next_upper)
1797 }
1798 self.metrics.transaction_commits.inc();
1799 let counter = self.metrics.transaction_commit_latency_seconds.clone();
1800 commit_transaction_inner(self, txn_batch, commit_ts)
1801 .wall_time()
1802 .inc_by(counter)
1803 .await
1804 }
1805
1806 #[mz_ore::instrument(level = "debug")]
1807 async fn confirm_leadership(&mut self) -> Result<(), CatalogError> {
1808 if self.is_read_only() {
1810 return Ok(());
1811 }
1812 self.sync_to_current_upper().await?;
1813 Ok(())
1814 }
1815}
1816
1817pub fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
1819 let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
1820 soft_assert_eq_or_log!(hash.len(), 32, "SHA256 returns 32 bytes (256 bits)");
1821 let uuid = Uuid::from_slice(&hash[0..16]).expect("from_slice accepts exactly 16 bytes");
1822 ShardId::from_str(&format!("s{uuid}")).expect("known to be valid")
1823}
1824
1825fn desc() -> RelationDesc {
1828 RelationDesc::builder()
1829 .with_column("data", SqlScalarType::Jsonb.nullable(false))
1830 .finish()
1831}
1832
1833fn as_of(
1836 read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1837 upper: Timestamp,
1838) -> Timestamp {
1839 let since = read_handle.since().clone();
1840 let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
1841 panic!("catalog persist shard should be initialize, found upper: {upper:?}")
1842 });
1843 soft_assert_or_log!(
1846 since.less_equal(&as_of),
1847 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
1848 );
1849 as_of.advance_by(since.borrow());
1852 as_of
1853}
1854
1855async fn fetch_catalog_upgrade_shard_version(
1858 persist_client: &PersistClient,
1859 upgrade_shard_id: ShardId,
1860) -> Option<semver::Version> {
1861 let shard_state = persist_client
1862 .inspect_shard::<Timestamp>(&upgrade_shard_id)
1863 .await
1864 .ok()?;
1865 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
1866 let upgrade_version = json_state
1867 .get("applier_version")
1868 .cloned()
1869 .expect("missing applier_version");
1870 let upgrade_version =
1871 serde_json::from_value(upgrade_version).expect("version deserialization error");
1872 Some(upgrade_version)
1873}
1874
1875#[mz_ore::instrument(level = "debug")]
1880async fn snapshot_binary(
1881 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1882 as_of: Timestamp,
1883 metrics: &Arc<Metrics>,
1884) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1885 metrics.snapshots_taken.inc();
1886 let counter = metrics.snapshot_latency_seconds.clone();
1887 snapshot_binary_inner(read_handle, as_of)
1888 .wall_time()
1889 .inc_by(counter)
1890 .await
1891}
1892
1893#[mz_ore::instrument(level = "debug")]
1898async fn snapshot_binary_inner(
1899 read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
1900 as_of: Timestamp,
1901) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator + use<> {
1902 let snapshot = read_handle
1903 .snapshot_and_fetch(Antichain::from_elem(as_of))
1904 .await
1905 .expect("we have advanced the restart_as_of by the since");
1906 soft_assert_no_log!(
1907 snapshot.iter().all(|(_, _, diff)| *diff == 1),
1908 "snapshot_and_fetch guarantees a consolidated result: {snapshot:#?}"
1909 );
1910 snapshot
1911 .into_iter()
1912 .map(Into::<StateUpdate<StateUpdateKindJson>>::into)
1913 .sorted_by(|a, b| Ord::cmp(&b.ts, &a.ts))
1914}
1915
1916pub(crate) fn antichain_to_timestamp(antichain: Antichain<Timestamp>) -> Timestamp {
1921 antichain
1922 .into_option()
1923 .expect("we use a totally ordered time and never finalize the shard")
1924}
1925
1926impl Trace {
1929 fn from_snapshot(snapshot: impl IntoIterator<Item = StateUpdate>) -> Trace {
1931 let mut trace = Trace::new();
1932 for StateUpdate { kind, ts, diff } in snapshot {
1933 match kind {
1934 StateUpdateKind::AuditLog(k, v) => trace.audit_log.values.push(((k, v), ts, diff)),
1935 StateUpdateKind::Cluster(k, v) => trace.clusters.values.push(((k, v), ts, diff)),
1936 StateUpdateKind::ClusterReplica(k, v) => {
1937 trace.cluster_replicas.values.push(((k, v), ts, diff))
1938 }
1939 StateUpdateKind::Comment(k, v) => trace.comments.values.push(((k, v), ts, diff)),
1940 StateUpdateKind::Config(k, v) => trace.configs.values.push(((k, v), ts, diff)),
1941 StateUpdateKind::Database(k, v) => trace.databases.values.push(((k, v), ts, diff)),
1942 StateUpdateKind::DefaultPrivilege(k, v) => {
1943 trace.default_privileges.values.push(((k, v), ts, diff))
1944 }
1945 StateUpdateKind::FenceToken(_) => {
1946 }
1948 StateUpdateKind::IdAllocator(k, v) => {
1949 trace.id_allocator.values.push(((k, v), ts, diff))
1950 }
1951 StateUpdateKind::IntrospectionSourceIndex(k, v) => {
1952 trace.introspection_sources.values.push(((k, v), ts, diff))
1953 }
1954 StateUpdateKind::Item(k, v) => trace.items.values.push(((k, v), ts, diff)),
1955 StateUpdateKind::NetworkPolicy(k, v) => {
1956 trace.network_policies.values.push(((k, v), ts, diff))
1957 }
1958 StateUpdateKind::Role(k, v) => trace.roles.values.push(((k, v), ts, diff)),
1959 StateUpdateKind::Schema(k, v) => trace.schemas.values.push(((k, v), ts, diff)),
1960 StateUpdateKind::Setting(k, v) => trace.settings.values.push(((k, v), ts, diff)),
1961 StateUpdateKind::SourceReferences(k, v) => {
1962 trace.source_references.values.push(((k, v), ts, diff))
1963 }
1964 StateUpdateKind::SystemConfiguration(k, v) => {
1965 trace.system_configurations.values.push(((k, v), ts, diff))
1966 }
1967 StateUpdateKind::SystemObjectMapping(k, v) => {
1968 trace.system_object_mappings.values.push(((k, v), ts, diff))
1969 }
1970 StateUpdateKind::SystemPrivilege(k, v) => {
1971 trace.system_privileges.values.push(((k, v), ts, diff))
1972 }
1973 StateUpdateKind::StorageCollectionMetadata(k, v) => trace
1974 .storage_collection_metadata
1975 .values
1976 .push(((k, v), ts, diff)),
1977 StateUpdateKind::UnfinalizedShard(k, ()) => {
1978 trace.unfinalized_shards.values.push(((k, ()), ts, diff))
1979 }
1980 StateUpdateKind::TxnWalShard((), v) => {
1981 trace.txn_wal_shard.values.push((((), v), ts, diff))
1982 }
1983 StateUpdateKind::RoleAuth(k, v) => trace.role_auth.values.push(((k, v), ts, diff)),
1984 }
1985 }
1986 trace
1987 }
1988}
1989
1990impl UnopenedPersistCatalogState {
1991 #[mz_ore::instrument]
1993 pub(crate) async fn debug_edit<T: Collection>(
1994 &mut self,
1995 key: T::Key,
1996 value: T::Value,
1997 ) -> Result<Option<T::Value>, CatalogError>
1998 where
1999 T::Key: PartialEq + Eq + Debug + Clone,
2000 T::Value: Debug + Clone,
2001 {
2002 let prev_value = loop {
2003 let key = key.clone();
2004 let value = value.clone();
2005 let snapshot = self.current_snapshot().await?;
2006 let trace = Trace::from_snapshot(snapshot);
2007 let collection_trace = T::collection_trace(trace);
2008 let prev_values: Vec<_> = collection_trace
2009 .values
2010 .into_iter()
2011 .filter(|((k, _), _, diff)| {
2012 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2013 &key == k
2014 })
2015 .collect();
2016
2017 let prev_value = match &prev_values[..] {
2018 [] => None,
2019 [((_, v), _, _)] => Some(v.clone()),
2020 prev_values => panic!("multiple values found for key {key:?}: {prev_values:?}"),
2021 };
2022
2023 let mut updates: Vec<_> = prev_values
2024 .into_iter()
2025 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2026 .collect();
2027 updates.push((T::update(key, value), Diff::ONE));
2028 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2030 Some((fence_updates, current_fenceable_token)) => {
2031 updates.extend(fence_updates.clone());
2032 match self.compare_and_append(updates, self.upper).await {
2033 Ok(_) => {
2034 self.fenceable_token = current_fenceable_token;
2035 break prev_value;
2036 }
2037 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2038 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2039 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2040 continue;
2041 }
2042 }
2043 }
2044 None => {
2045 self.compare_and_append(updates, self.upper)
2046 .await
2047 .map_err(|e| e.unwrap_fence_error())?;
2048 break prev_value;
2049 }
2050 }
2051 };
2052 Ok(prev_value)
2053 }
2054
2055 #[mz_ore::instrument]
2057 pub(crate) async fn debug_delete<T: Collection>(
2058 &mut self,
2059 key: T::Key,
2060 ) -> Result<(), CatalogError>
2061 where
2062 T::Key: PartialEq + Eq + Debug + Clone,
2063 T::Value: Debug,
2064 {
2065 loop {
2066 let key = key.clone();
2067 let snapshot = self.current_snapshot().await?;
2068 let trace = Trace::from_snapshot(snapshot);
2069 let collection_trace = T::collection_trace(trace);
2070 let mut retractions: Vec<_> = collection_trace
2071 .values
2072 .into_iter()
2073 .filter(|((k, _), _, diff)| {
2074 soft_assert_eq_or_log!(*diff, Diff::ONE, "trace is consolidated");
2075 &key == k
2076 })
2077 .map(|((k, v), _, _)| (T::update(k, v), Diff::MINUS_ONE))
2078 .collect();
2079
2080 match self.fenceable_token.generate_unfenced_token(self.mode)? {
2082 Some((fence_updates, current_fenceable_token)) => {
2083 retractions.extend(fence_updates.clone());
2084 match self.compare_and_append(retractions, self.upper).await {
2085 Ok(_) => {
2086 self.fenceable_token = current_fenceable_token;
2087 break;
2088 }
2089 Err(CompareAndAppendError::Fence(e)) => return Err(e.into()),
2090 Err(e @ CompareAndAppendError::UpperMismatch { .. }) => {
2091 warn!("catalog write failed due to upper mismatch, retrying: {e:?}");
2092 continue;
2093 }
2094 }
2095 }
2096 None => {
2097 self.compare_and_append(retractions, self.upper)
2098 .await
2099 .map_err(|e| e.unwrap_fence_error())?;
2100 break;
2101 }
2102 }
2103 }
2104 Ok(())
2105 }
2106
2107 async fn current_snapshot(
2112 &mut self,
2113 ) -> Result<impl IntoIterator<Item = StateUpdate> + '_, CatalogError> {
2114 self.sync_to_current_upper().await?;
2115 self.consolidate();
2116 Ok(self.snapshot.iter().cloned().map(|(kind, ts, diff)| {
2117 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
2118 StateUpdate { kind, ts, diff }
2119 }))
2120 }
2121}