1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33use crate::durable::objects::AuditLog;
34pub use crate::durable::objects::Snapshot;
35pub use crate::durable::objects::state_update::StateUpdate;
36use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
37pub use crate::durable::objects::{
38 Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
39 Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
40 ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
41 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
42 UnfinalizedShard,
43};
44pub use crate::durable::persist::shard_id;
45use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
46pub use crate::durable::transaction::Transaction;
47use crate::durable::transaction::TransactionBatch;
48pub use crate::durable::upgrade::CATALOG_VERSION;
49use crate::memory;
50
51pub mod debug;
52mod error;
53pub mod initialize;
54mod metrics;
55pub mod objects;
56mod persist;
57mod traits;
58mod transaction;
59mod upgrade;
60
61pub const DATABASE_ID_ALLOC_KEY: &str = "database";
62pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
63pub const USER_ITEM_ALLOC_KEY: &str = "user";
64pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
65pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
66pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
67pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
68pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
69pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
70pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
71pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
72pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
73pub const OID_ALLOC_KEY: &str = "oid";
74pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
75pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
76pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
77pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
78
79#[derive(Clone, Debug)]
80pub struct BootstrapArgs {
81 pub cluster_replica_size_map: ClusterReplicaSizeMap,
82 pub default_cluster_replica_size: String,
83 pub default_cluster_replication_factor: u32,
84 pub bootstrap_role: Option<String>,
85}
86
87pub type Epoch = NonZeroI64;
88
89#[async_trait]
93pub trait OpenableDurableCatalogState: Debug + Send {
94 async fn open_savepoint(
113 mut self: Box<Self>,
114 initial_ts: Timestamp,
115 bootstrap_args: &BootstrapArgs,
116 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
117
118 async fn open_read_only(
124 mut self: Box<Self>,
125 bootstrap_args: &BootstrapArgs,
126 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
127
128 async fn open(
136 mut self: Box<Self>,
137 initial_ts: Timestamp,
138 bootstrap_args: &BootstrapArgs,
139 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
140
141 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
144
145 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
147
148 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
158
159 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
162
163 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
169
170 async fn get_0dt_deployment_ddl_check_interval(
176 &mut self,
177 ) -> Result<Option<Duration>, CatalogError>;
178
179 async fn get_enable_0dt_deployment_panic_after_timeout(
186 &mut self,
187 ) -> Result<Option<bool>, CatalogError>;
188
189 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
191
192 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
194
195 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
197
198 async fn expire(self: Box<Self>);
200}
201
202#[async_trait]
204pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
205 fn epoch(&self) -> Epoch;
215
216 fn metrics(&self) -> &Metrics;
218
219 async fn expire(self: Box<Self>);
221
222 fn is_bootstrap_complete(&self) -> bool;
224
225 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
231
232 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
234
235 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
237 self.get_next_id(USER_ITEM_ALLOC_KEY).await
238 }
239
240 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
242 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
243 }
244
245 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
247 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
248 }
249
250 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
252 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
253 }
254
255 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
257
258 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
260
261 async fn sync_to_current_updates(
267 &mut self,
268 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
269
270 async fn sync_updates(
279 &mut self,
280 target_upper: Timestamp,
281 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
282
283 async fn current_upper(&mut self) -> Timestamp;
285}
286
287#[async_trait]
289#[allow(mismatched_lifetime_syntaxes)]
290pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
291 fn is_read_only(&self) -> bool;
293
294 fn is_savepoint(&self) -> bool;
296
297 async fn mark_bootstrap_complete(&mut self);
299
300 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
302
303 fn transaction_from_snapshot(
309 &mut self,
310 snapshot: Snapshot,
311 ) -> Result<Transaction, CatalogError>;
312
313 async fn commit_transaction(
321 &mut self,
322 txn_batch: TransactionBatch,
323 commit_ts: Timestamp,
324 ) -> Result<Timestamp, CatalogError>;
325
326 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError>;
331
332 #[mz_ore::instrument(level = "debug")]
336 async fn allocate_id(
337 &mut self,
338 id_type: &str,
339 amount: u64,
340 commit_ts: Timestamp,
341 ) -> Result<Vec<u64>, CatalogError> {
342 let start = Instant::now();
343 if amount == 0 {
344 return Ok(Vec::new());
345 }
346 let mut txn = self.transaction().await?;
347 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
348 txn.commit_internal(commit_ts).await?;
349 self.metrics()
350 .allocate_id_seconds
351 .observe(start.elapsed().as_secs_f64());
352 Ok(ids)
353 }
354
355 async fn allocate_user_ids(
359 &mut self,
360 amount: u64,
361 commit_ts: Timestamp,
362 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
363 let ids = self
364 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
365 .await?;
366 let ids = ids
367 .iter()
368 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
369 .collect();
370 Ok(ids)
371 }
372
373 async fn allocate_user_id(
377 &mut self,
378 commit_ts: Timestamp,
379 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
380 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
381 let id = id.into_element();
382 Ok((CatalogItemId::User(id), GlobalId::User(id)))
383 }
384
385 async fn allocate_user_cluster_id(
389 &mut self,
390 commit_ts: Timestamp,
391 ) -> Result<ClusterId, CatalogError> {
392 let id = self
393 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
394 .await?
395 .into_element();
396 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
397 }
398
399 fn shard_id(&self) -> ShardId;
400}
401
402trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
403impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
404
405#[derive(Debug)]
407pub struct AuditLogIterator {
408 audit_logs: Box<dyn AuditLogIteratorTrait>,
411}
412
413impl AuditLogIterator {
414 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
415 let audit_logs = audit_logs
416 .into_iter()
417 .map(|(kind, ts, diff)| {
418 assert_eq!(
419 diff,
420 Diff::ONE,
421 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
422 );
423 assert!(
424 kind.is_audit_log(),
425 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
426 );
427 let id = kind.audit_log_id();
428 (kind, ts, id)
429 })
430 .sorted_by_key(|(_, ts, id)| (*ts, *id))
431 .map(|(kind, ts, _id)| (kind, ts))
432 .rev()
433 .map(|(kind, ts)| {
434 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
436 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
437 .try_into()
438 .expect("invalid persisted update: {update:#?}");
439 let kind = kind.expect("audit log always produces im-memory updates");
440 let audit_log = match kind {
441 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
442 kind => unreachable!("invalid kind: {kind:?}"),
443 };
444 (audit_log, ts)
445 });
446 Self {
447 audit_logs: Box::new(audit_logs),
448 }
449 }
450}
451
452impl Iterator for AuditLogIterator {
453 type Item = (AuditLog, Timestamp);
454
455 fn next(&mut self) -> Option<Self::Item> {
456 self.audit_logs.next()
457 }
458}
459
460pub fn persist_desc() -> RelationDesc {
463 RelationDesc::builder()
464 .with_column("data", SqlScalarType::Jsonb.nullable(false))
465 .finish()
466}
467
468#[derive(Debug, Clone)]
470pub struct TestCatalogStateBuilder {
471 persist_client: PersistClient,
472 organization_id: Uuid,
473 version: semver::Version,
474 deploy_generation: Option<u64>,
475 metrics: Arc<Metrics>,
476}
477
478impl TestCatalogStateBuilder {
479 pub fn new(persist_client: PersistClient) -> Self {
480 Self {
481 persist_client,
482 organization_id: Uuid::new_v4(),
483 version: semver::Version::new(0, 0, 0),
484 deploy_generation: None,
485 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
486 }
487 }
488
489 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
490 self.organization_id = organization_id;
491 self
492 }
493
494 pub fn with_version(mut self, version: semver::Version) -> Self {
495 self.version = version;
496 self
497 }
498
499 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
500 self.deploy_generation = Some(deploy_generation);
501 self
502 }
503
504 pub fn with_default_deploy_generation(self) -> Self {
505 self.with_deploy_generation(0)
506 }
507
508 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
509 self.metrics = metrics;
510 self
511 }
512
513 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
514 persist_backed_catalog_state(
515 self.persist_client,
516 self.organization_id,
517 self.version,
518 self.deploy_generation,
519 self.metrics,
520 )
521 .await
522 }
523
524 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
525 self.expect_build("failed to build").await
526 }
527
528 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
529 self.build().await.expect(msg)
530 }
531}
532
533pub async fn persist_backed_catalog_state(
537 persist_client: PersistClient,
538 organization_id: Uuid,
539 version: semver::Version,
540 deploy_generation: Option<u64>,
541 metrics: Arc<Metrics>,
542) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
543 let state = UnopenedPersistCatalogState::new(
544 persist_client,
545 organization_id,
546 version,
547 deploy_generation,
548 metrics,
549 )
550 .await?;
551 Ok(Box::new(state))
552}
553
554pub fn test_bootstrap_args() -> BootstrapArgs {
555 BootstrapArgs {
556 default_cluster_replica_size: "scale=1,workers=1".into(),
557 default_cluster_replication_factor: 1,
558 bootstrap_role: None,
559 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
560 }
561}