1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36 Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37 Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38 ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40 UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
76
77#[derive(Clone, Debug)]
78pub struct BootstrapArgs {
79 pub cluster_replica_size_map: ClusterReplicaSizeMap,
80 pub default_cluster_replica_size: String,
81 pub default_cluster_replication_factor: u32,
82 pub bootstrap_role: Option<String>,
83}
84
85pub type Epoch = NonZeroI64;
86
87#[async_trait]
91pub trait OpenableDurableCatalogState: Debug + Send {
92 async fn open_savepoint(
111 mut self: Box<Self>,
112 initial_ts: Timestamp,
113 bootstrap_args: &BootstrapArgs,
114 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
115
116 async fn open_read_only(
122 mut self: Box<Self>,
123 bootstrap_args: &BootstrapArgs,
124 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
125
126 async fn open(
134 mut self: Box<Self>,
135 initial_ts: Timestamp,
136 bootstrap_args: &BootstrapArgs,
137 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
138
139 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
142
143 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
145
146 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
156
157 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
160
161 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
167
168 async fn get_0dt_deployment_ddl_check_interval(
174 &mut self,
175 ) -> Result<Option<Duration>, CatalogError>;
176
177 async fn get_enable_0dt_deployment_panic_after_timeout(
184 &mut self,
185 ) -> Result<Option<bool>, CatalogError>;
186
187 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
189
190 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
192
193 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
195
196 async fn expire(self: Box<Self>);
198}
199
200#[async_trait]
202pub trait ReadOnlyDurableCatalogState: Debug + Send {
203 fn epoch(&self) -> Epoch;
213
214 async fn expire(self: Box<Self>);
216
217 fn is_bootstrap_complete(&self) -> bool;
219
220 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
226
227 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
229
230 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
232 self.get_next_id(USER_ITEM_ALLOC_KEY).await
233 }
234
235 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
237 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
238 }
239
240 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
242 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
243 }
244
245 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
247 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
248 }
249
250 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
252
253 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
255
256 async fn sync_to_current_updates(
262 &mut self,
263 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
264
265 async fn sync_updates(
274 &mut self,
275 target_upper: Timestamp,
276 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
277
278 async fn current_upper(&mut self) -> Timestamp;
280}
281
282#[async_trait]
284#[allow(mismatched_lifetime_syntaxes)]
285pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
286 fn is_read_only(&self) -> bool;
288
289 fn is_savepoint(&self) -> bool;
291
292 fn mark_bootstrap_complete(&mut self);
294
295 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
297
298 async fn commit_transaction(
306 &mut self,
307 txn_batch: TransactionBatch,
308 commit_ts: Timestamp,
309 ) -> Result<Timestamp, CatalogError>;
310
311 async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
315
316 #[mz_ore::instrument(level = "debug")]
320 async fn allocate_id(
321 &mut self,
322 id_type: &str,
323 amount: u64,
324 commit_ts: Timestamp,
325 ) -> Result<Vec<u64>, CatalogError> {
326 if amount == 0 {
327 return Ok(Vec::new());
328 }
329 let mut txn = self.transaction().await?;
330 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
331 txn.commit_internal(commit_ts).await?;
332 Ok(ids)
333 }
334
335 async fn allocate_user_ids(
339 &mut self,
340 amount: u64,
341 commit_ts: Timestamp,
342 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
343 let ids = self
344 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
345 .await?;
346 let ids = ids
347 .iter()
348 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
349 .collect();
350 Ok(ids)
351 }
352
353 async fn allocate_user_id(
357 &mut self,
358 commit_ts: Timestamp,
359 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
360 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
361 let id = id.into_element();
362 Ok((CatalogItemId::User(id), GlobalId::User(id)))
363 }
364
365 async fn allocate_user_cluster_id(
369 &mut self,
370 commit_ts: Timestamp,
371 ) -> Result<ClusterId, CatalogError> {
372 let id = self
373 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
374 .await?
375 .into_element();
376 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
377 }
378}
379
380trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
381impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
382
383#[derive(Debug)]
385pub struct AuditLogIterator {
386 audit_logs: Box<dyn AuditLogIteratorTrait>,
389}
390
391impl AuditLogIterator {
392 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
393 let audit_logs = audit_logs
394 .into_iter()
395 .map(|(kind, ts, diff)| {
396 assert_eq!(
397 diff,
398 Diff::ONE,
399 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
400 );
401 assert!(
402 kind.is_audit_log(),
403 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
404 );
405 let id = kind.audit_log_id();
406 (kind, ts, id)
407 })
408 .sorted_by_key(|(_, ts, id)| (*ts, *id))
409 .map(|(kind, ts, _id)| (kind, ts))
410 .rev()
411 .map(|(kind, ts)| {
412 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
414 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
415 .try_into()
416 .expect("invalid persisted update: {update:#?}");
417 let kind = kind.expect("audit log always produces im-memory updates");
418 let audit_log = match kind {
419 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
420 kind => unreachable!("invalid kind: {kind:?}"),
421 };
422 (audit_log, ts)
423 });
424 Self {
425 audit_logs: Box::new(audit_logs),
426 }
427 }
428}
429
430impl Iterator for AuditLogIterator {
431 type Item = (AuditLog, Timestamp);
432
433 fn next(&mut self) -> Option<Self::Item> {
434 self.audit_logs.next()
435 }
436}
437
438#[derive(Debug, Clone)]
440pub struct TestCatalogStateBuilder {
441 persist_client: PersistClient,
442 organization_id: Uuid,
443 version: semver::Version,
444 deploy_generation: Option<u64>,
445 metrics: Arc<Metrics>,
446}
447
448impl TestCatalogStateBuilder {
449 pub fn new(persist_client: PersistClient) -> Self {
450 Self {
451 persist_client,
452 organization_id: Uuid::new_v4(),
453 version: semver::Version::new(0, 0, 0),
454 deploy_generation: None,
455 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
456 }
457 }
458
459 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
460 self.organization_id = organization_id;
461 self
462 }
463
464 pub fn with_version(mut self, version: semver::Version) -> Self {
465 self.version = version;
466 self
467 }
468
469 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
470 self.deploy_generation = Some(deploy_generation);
471 self
472 }
473
474 pub fn with_default_deploy_generation(self) -> Self {
475 self.with_deploy_generation(0)
476 }
477
478 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
479 self.metrics = metrics;
480 self
481 }
482
483 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
484 persist_backed_catalog_state(
485 self.persist_client,
486 self.organization_id,
487 self.version,
488 self.deploy_generation,
489 self.metrics,
490 )
491 .await
492 }
493
494 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
495 self.expect_build("failed to build").await
496 }
497
498 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
499 self.build().await.expect(msg)
500 }
501}
502
503pub async fn persist_backed_catalog_state(
507 persist_client: PersistClient,
508 organization_id: Uuid,
509 version: semver::Version,
510 deploy_generation: Option<u64>,
511 metrics: Arc<Metrics>,
512) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
513 let state = UnopenedPersistCatalogState::new(
514 persist_client,
515 organization_id,
516 version,
517 deploy_generation,
518 metrics,
519 )
520 .await?;
521 Ok(Box::new(state))
522}
523
524pub fn test_bootstrap_args() -> BootstrapArgs {
525 BootstrapArgs {
526 default_cluster_replica_size: "scale=1,workers=1".into(),
527 default_cluster_replication_factor: 1,
528 bootstrap_role: None,
529 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
530 }
531}