1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36 Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37 Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38 ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40 UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75
76#[derive(Clone, Debug)]
77pub struct BootstrapArgs {
78 pub cluster_replica_size_map: ClusterReplicaSizeMap,
79 pub default_cluster_replica_size: String,
80 pub default_cluster_replication_factor: u32,
81 pub bootstrap_role: Option<String>,
82}
83
84pub type Epoch = NonZeroI64;
85
86#[async_trait]
90pub trait OpenableDurableCatalogState: Debug + Send {
91 async fn open_savepoint(
110 mut self: Box<Self>,
111 initial_ts: Timestamp,
112 bootstrap_args: &BootstrapArgs,
113 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
114
115 async fn open_read_only(
121 mut self: Box<Self>,
122 bootstrap_args: &BootstrapArgs,
123 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
124
125 async fn open(
133 mut self: Box<Self>,
134 initial_ts: Timestamp,
135 bootstrap_args: &BootstrapArgs,
136 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
137
138 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
141
142 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
144
145 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
155
156 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
159
160 async fn get_enable_0dt_deployment(&mut self) -> Result<Option<bool>, CatalogError>;
166
167 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
173
174 async fn get_0dt_deployment_ddl_check_interval(
180 &mut self,
181 ) -> Result<Option<Duration>, CatalogError>;
182
183 async fn get_enable_0dt_deployment_panic_after_timeout(
190 &mut self,
191 ) -> Result<Option<bool>, CatalogError>;
192
193 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
195
196 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
198
199 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
201
202 async fn expire(self: Box<Self>);
204}
205
206#[async_trait]
208pub trait ReadOnlyDurableCatalogState: Debug + Send {
209 fn epoch(&self) -> Epoch;
219
220 async fn expire(self: Box<Self>);
222
223 fn is_bootstrap_complete(&self) -> bool;
225
226 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
232
233 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
235
236 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
238 self.get_next_id(USER_ITEM_ALLOC_KEY).await
239 }
240
241 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
243 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
244 }
245
246 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
248 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
249 }
250
251 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
253 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
254 }
255
256 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
258
259 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
261
262 async fn sync_to_current_updates(
268 &mut self,
269 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
270
271 async fn sync_updates(
280 &mut self,
281 target_upper: Timestamp,
282 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
283
284 async fn current_upper(&mut self) -> Timestamp;
286}
287
288#[async_trait]
290#[allow(elided_named_lifetimes)]
291pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
292 fn is_read_only(&self) -> bool;
294
295 fn is_savepoint(&self) -> bool;
297
298 fn mark_bootstrap_complete(&mut self);
300
301 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
303
304 async fn commit_transaction(
312 &mut self,
313 txn_batch: TransactionBatch,
314 commit_ts: Timestamp,
315 ) -> Result<Timestamp, CatalogError>;
316
317 async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
321
322 #[mz_ore::instrument(level = "debug")]
326 async fn allocate_id(
327 &mut self,
328 id_type: &str,
329 amount: u64,
330 commit_ts: Timestamp,
331 ) -> Result<Vec<u64>, CatalogError> {
332 if amount == 0 {
333 return Ok(Vec::new());
334 }
335 let mut txn = self.transaction().await?;
336 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
337 txn.commit_internal(commit_ts).await?;
338 Ok(ids)
339 }
340
341 async fn allocate_user_ids(
345 &mut self,
346 amount: u64,
347 commit_ts: Timestamp,
348 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
349 let ids = self
350 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
351 .await?;
352 let ids = ids
353 .iter()
354 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
355 .collect();
356 Ok(ids)
357 }
358
359 async fn allocate_user_id(
363 &mut self,
364 commit_ts: Timestamp,
365 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
366 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
367 let id = id.into_element();
368 Ok((CatalogItemId::User(id), GlobalId::User(id)))
369 }
370
371 async fn allocate_user_cluster_id(
375 &mut self,
376 commit_ts: Timestamp,
377 ) -> Result<ClusterId, CatalogError> {
378 let id = self
379 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
380 .await?
381 .into_element();
382 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
383 }
384}
385
386trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
387impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
388
389#[derive(Debug)]
391pub struct AuditLogIterator {
392 audit_logs: Box<dyn AuditLogIteratorTrait>,
395}
396
397impl AuditLogIterator {
398 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
399 let audit_logs = audit_logs
400 .into_iter()
401 .map(|(kind, ts, diff)| {
402 assert_eq!(
403 diff,
404 Diff::ONE,
405 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
406 );
407 assert!(
408 kind.is_audit_log(),
409 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
410 );
411 let id = kind.audit_log_id();
412 (kind, ts, id)
413 })
414 .sorted_by_key(|(_, ts, id)| (*ts, *id))
415 .map(|(kind, ts, _id)| (kind, ts))
416 .rev()
417 .map(|(kind, ts)| {
418 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
420 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
421 .try_into()
422 .expect("invalid persisted update: {update:#?}");
423 let kind = kind.expect("audit log always produces im-memory updates");
424 let audit_log = match kind {
425 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
426 kind => unreachable!("invalid kind: {kind:?}"),
427 };
428 (audit_log, ts)
429 });
430 Self {
431 audit_logs: Box::new(audit_logs),
432 }
433 }
434}
435
436impl Iterator for AuditLogIterator {
437 type Item = (AuditLog, Timestamp);
438
439 fn next(&mut self) -> Option<Self::Item> {
440 self.audit_logs.next()
441 }
442}
443
444#[derive(Debug, Clone)]
446pub struct TestCatalogStateBuilder {
447 persist_client: PersistClient,
448 organization_id: Uuid,
449 version: semver::Version,
450 deploy_generation: Option<u64>,
451 metrics: Arc<Metrics>,
452}
453
454impl TestCatalogStateBuilder {
455 pub fn new(persist_client: PersistClient) -> Self {
456 Self {
457 persist_client,
458 organization_id: Uuid::new_v4(),
459 version: semver::Version::new(0, 0, 0),
460 deploy_generation: None,
461 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
462 }
463 }
464
465 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
466 self.organization_id = organization_id;
467 self
468 }
469
470 pub fn with_version(mut self, version: semver::Version) -> Self {
471 self.version = version;
472 self
473 }
474
475 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
476 self.deploy_generation = Some(deploy_generation);
477 self
478 }
479
480 pub fn with_default_deploy_generation(self) -> Self {
481 self.with_deploy_generation(0)
482 }
483
484 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
485 self.metrics = metrics;
486 self
487 }
488
489 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
490 persist_backed_catalog_state(
491 self.persist_client,
492 self.organization_id,
493 self.version,
494 self.deploy_generation,
495 self.metrics,
496 )
497 .await
498 }
499
500 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
501 self.expect_build("failed to build").await
502 }
503
504 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
505 self.build().await.expect(msg)
506 }
507}
508
509pub async fn persist_backed_catalog_state(
513 persist_client: PersistClient,
514 organization_id: Uuid,
515 version: semver::Version,
516 deploy_generation: Option<u64>,
517 metrics: Arc<Metrics>,
518) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
519 let state = UnopenedPersistCatalogState::new(
520 persist_client,
521 organization_id,
522 version,
523 deploy_generation,
524 metrics,
525 )
526 .await?;
527 Ok(Box::new(state))
528}
529
530pub fn test_bootstrap_args() -> BootstrapArgs {
531 BootstrapArgs {
532 default_cluster_replica_size: "1".into(),
533 default_cluster_replication_factor: 2,
534 bootstrap_role: None,
535 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
536 }
537}