1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33pub use crate::durable::objects::state_update::StateUpdate;
34use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
35use crate::durable::objects::{AuditLog, Snapshot};
36pub use crate::durable::objects::{
37 Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
38 Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
39 ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
40 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
41 UnfinalizedShard,
42};
43pub use crate::durable::persist::shard_id;
44use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
45pub use crate::durable::transaction::Transaction;
46use crate::durable::transaction::TransactionBatch;
47pub use crate::durable::upgrade::CATALOG_VERSION;
48use crate::memory;
49
50pub mod debug;
51mod error;
52pub mod initialize;
53mod metrics;
54pub mod objects;
55mod persist;
56mod traits;
57mod transaction;
58mod upgrade;
59
60pub const DATABASE_ID_ALLOC_KEY: &str = "database";
61pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
62pub const USER_ITEM_ALLOC_KEY: &str = "user";
63pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
64pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
65pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
66pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
67pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
68pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
69pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
70pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
71pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
72pub const OID_ALLOC_KEY: &str = "oid";
73pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
74pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
75pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
76pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
77
78#[derive(Clone, Debug)]
79pub struct BootstrapArgs {
80 pub cluster_replica_size_map: ClusterReplicaSizeMap,
81 pub default_cluster_replica_size: String,
82 pub default_cluster_replication_factor: u32,
83 pub bootstrap_role: Option<String>,
84}
85
86pub type Epoch = NonZeroI64;
87
88#[async_trait]
92pub trait OpenableDurableCatalogState: Debug + Send {
93 async fn open_savepoint(
112 mut self: Box<Self>,
113 initial_ts: Timestamp,
114 bootstrap_args: &BootstrapArgs,
115 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
116
117 async fn open_read_only(
123 mut self: Box<Self>,
124 bootstrap_args: &BootstrapArgs,
125 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
126
127 async fn open(
135 mut self: Box<Self>,
136 initial_ts: Timestamp,
137 bootstrap_args: &BootstrapArgs,
138 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
139
140 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
143
144 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
146
147 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
157
158 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
161
162 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
168
169 async fn get_0dt_deployment_ddl_check_interval(
175 &mut self,
176 ) -> Result<Option<Duration>, CatalogError>;
177
178 async fn get_enable_0dt_deployment_panic_after_timeout(
185 &mut self,
186 ) -> Result<Option<bool>, CatalogError>;
187
188 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
190
191 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
193
194 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
196
197 async fn expire(self: Box<Self>);
199}
200
201#[async_trait]
203pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
204 fn epoch(&self) -> Epoch;
214
215 fn metrics(&self) -> &Metrics;
217
218 async fn expire(self: Box<Self>);
220
221 fn is_bootstrap_complete(&self) -> bool;
223
224 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
230
231 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
233
234 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
236 self.get_next_id(USER_ITEM_ALLOC_KEY).await
237 }
238
239 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
241 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
242 }
243
244 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
246 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
247 }
248
249 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
251 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
252 }
253
254 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
256
257 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
259
260 async fn sync_to_current_updates(
266 &mut self,
267 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
268
269 async fn sync_updates(
278 &mut self,
279 target_upper: Timestamp,
280 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
281
282 async fn current_upper(&mut self) -> Timestamp;
284}
285
286#[async_trait]
288#[allow(mismatched_lifetime_syntaxes)]
289pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
290 fn is_read_only(&self) -> bool;
292
293 fn is_savepoint(&self) -> bool;
295
296 async fn mark_bootstrap_complete(&mut self);
298
299 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
301
302 async fn commit_transaction(
310 &mut self,
311 txn_batch: TransactionBatch,
312 commit_ts: Timestamp,
313 ) -> Result<Timestamp, CatalogError>;
314
315 async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
319
320 #[mz_ore::instrument(level = "debug")]
324 async fn allocate_id(
325 &mut self,
326 id_type: &str,
327 amount: u64,
328 commit_ts: Timestamp,
329 ) -> Result<Vec<u64>, CatalogError> {
330 let start = Instant::now();
331 if amount == 0 {
332 return Ok(Vec::new());
333 }
334 let mut txn = self.transaction().await?;
335 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
336 txn.commit_internal(commit_ts).await?;
337 self.metrics()
338 .allocate_id_seconds
339 .observe(start.elapsed().as_secs_f64());
340 Ok(ids)
341 }
342
343 async fn allocate_user_ids(
347 &mut self,
348 amount: u64,
349 commit_ts: Timestamp,
350 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
351 let ids = self
352 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
353 .await?;
354 let ids = ids
355 .iter()
356 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
357 .collect();
358 Ok(ids)
359 }
360
361 async fn allocate_user_id(
365 &mut self,
366 commit_ts: Timestamp,
367 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
368 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
369 let id = id.into_element();
370 Ok((CatalogItemId::User(id), GlobalId::User(id)))
371 }
372
373 async fn allocate_user_cluster_id(
377 &mut self,
378 commit_ts: Timestamp,
379 ) -> Result<ClusterId, CatalogError> {
380 let id = self
381 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
382 .await?
383 .into_element();
384 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
385 }
386
387 fn shard_id(&self) -> ShardId;
388}
389
390trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
391impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
392
393#[derive(Debug)]
395pub struct AuditLogIterator {
396 audit_logs: Box<dyn AuditLogIteratorTrait>,
399}
400
401impl AuditLogIterator {
402 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
403 let audit_logs = audit_logs
404 .into_iter()
405 .map(|(kind, ts, diff)| {
406 assert_eq!(
407 diff,
408 Diff::ONE,
409 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
410 );
411 assert!(
412 kind.is_audit_log(),
413 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
414 );
415 let id = kind.audit_log_id();
416 (kind, ts, id)
417 })
418 .sorted_by_key(|(_, ts, id)| (*ts, *id))
419 .map(|(kind, ts, _id)| (kind, ts))
420 .rev()
421 .map(|(kind, ts)| {
422 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
424 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
425 .try_into()
426 .expect("invalid persisted update: {update:#?}");
427 let kind = kind.expect("audit log always produces im-memory updates");
428 let audit_log = match kind {
429 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
430 kind => unreachable!("invalid kind: {kind:?}"),
431 };
432 (audit_log, ts)
433 });
434 Self {
435 audit_logs: Box::new(audit_logs),
436 }
437 }
438}
439
440impl Iterator for AuditLogIterator {
441 type Item = (AuditLog, Timestamp);
442
443 fn next(&mut self) -> Option<Self::Item> {
444 self.audit_logs.next()
445 }
446}
447
448pub fn persist_desc() -> RelationDesc {
451 RelationDesc::builder()
452 .with_column("data", SqlScalarType::Jsonb.nullable(false))
453 .finish()
454}
455
456#[derive(Debug, Clone)]
458pub struct TestCatalogStateBuilder {
459 persist_client: PersistClient,
460 organization_id: Uuid,
461 version: semver::Version,
462 deploy_generation: Option<u64>,
463 metrics: Arc<Metrics>,
464}
465
466impl TestCatalogStateBuilder {
467 pub fn new(persist_client: PersistClient) -> Self {
468 Self {
469 persist_client,
470 organization_id: Uuid::new_v4(),
471 version: semver::Version::new(0, 0, 0),
472 deploy_generation: None,
473 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
474 }
475 }
476
477 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
478 self.organization_id = organization_id;
479 self
480 }
481
482 pub fn with_version(mut self, version: semver::Version) -> Self {
483 self.version = version;
484 self
485 }
486
487 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
488 self.deploy_generation = Some(deploy_generation);
489 self
490 }
491
492 pub fn with_default_deploy_generation(self) -> Self {
493 self.with_deploy_generation(0)
494 }
495
496 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
497 self.metrics = metrics;
498 self
499 }
500
501 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
502 persist_backed_catalog_state(
503 self.persist_client,
504 self.organization_id,
505 self.version,
506 self.deploy_generation,
507 self.metrics,
508 )
509 .await
510 }
511
512 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
513 self.expect_build("failed to build").await
514 }
515
516 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
517 self.build().await.expect(msg)
518 }
519}
520
521pub async fn persist_backed_catalog_state(
525 persist_client: PersistClient,
526 organization_id: Uuid,
527 version: semver::Version,
528 deploy_generation: Option<u64>,
529 metrics: Arc<Metrics>,
530) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
531 let state = UnopenedPersistCatalogState::new(
532 persist_client,
533 organization_id,
534 version,
535 deploy_generation,
536 metrics,
537 )
538 .await?;
539 Ok(Box::new(state))
540}
541
542pub fn test_bootstrap_args() -> BootstrapArgs {
543 BootstrapArgs {
544 default_cluster_replica_size: "scale=1,workers=1".into(),
545 default_cluster_replication_factor: 1,
546 bootstrap_role: None,
547 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
548 }
549}