1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36 Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37 Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38 ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40 UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
76
77#[derive(Clone, Debug)]
78pub struct BootstrapArgs {
79 pub cluster_replica_size_map: ClusterReplicaSizeMap,
80 pub default_cluster_replica_size: String,
81 pub default_cluster_replication_factor: u32,
82 pub bootstrap_role: Option<String>,
83}
84
85pub type Epoch = NonZeroI64;
86
87#[async_trait]
91pub trait OpenableDurableCatalogState: Debug + Send {
92 async fn open_savepoint(
111 mut self: Box<Self>,
112 initial_ts: Timestamp,
113 bootstrap_args: &BootstrapArgs,
114 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
115
116 async fn open_read_only(
122 mut self: Box<Self>,
123 bootstrap_args: &BootstrapArgs,
124 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
125
126 async fn open(
134 mut self: Box<Self>,
135 initial_ts: Timestamp,
136 bootstrap_args: &BootstrapArgs,
137 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
138
139 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
142
143 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
145
146 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
156
157 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
160
161 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
167
168 async fn get_0dt_deployment_ddl_check_interval(
174 &mut self,
175 ) -> Result<Option<Duration>, CatalogError>;
176
177 async fn get_enable_0dt_deployment_panic_after_timeout(
184 &mut self,
185 ) -> Result<Option<bool>, CatalogError>;
186
187 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
189
190 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
192
193 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
195
196 async fn expire(self: Box<Self>);
198}
199
200#[async_trait]
202pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
203 fn epoch(&self) -> Epoch;
213
214 fn metrics(&self) -> &Metrics;
216
217 async fn expire(self: Box<Self>);
219
220 fn is_bootstrap_complete(&self) -> bool;
222
223 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
229
230 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
232
233 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
235 self.get_next_id(USER_ITEM_ALLOC_KEY).await
236 }
237
238 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
240 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
241 }
242
243 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
245 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
246 }
247
248 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
250 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
251 }
252
253 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
255
256 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
258
259 async fn sync_to_current_updates(
265 &mut self,
266 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
267
268 async fn sync_updates(
277 &mut self,
278 target_upper: Timestamp,
279 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
280
281 async fn current_upper(&mut self) -> Timestamp;
283}
284
285#[async_trait]
287#[allow(mismatched_lifetime_syntaxes)]
288pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
289 fn is_read_only(&self) -> bool;
291
292 fn is_savepoint(&self) -> bool;
294
295 async fn mark_bootstrap_complete(&mut self);
297
298 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
300
301 async fn commit_transaction(
309 &mut self,
310 txn_batch: TransactionBatch,
311 commit_ts: Timestamp,
312 ) -> Result<Timestamp, CatalogError>;
313
314 async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
318
319 #[mz_ore::instrument(level = "debug")]
323 async fn allocate_id(
324 &mut self,
325 id_type: &str,
326 amount: u64,
327 commit_ts: Timestamp,
328 ) -> Result<Vec<u64>, CatalogError> {
329 let start = Instant::now();
330 if amount == 0 {
331 return Ok(Vec::new());
332 }
333 let mut txn = self.transaction().await?;
334 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
335 txn.commit_internal(commit_ts).await?;
336 self.metrics()
337 .allocate_id_seconds
338 .observe(start.elapsed().as_secs_f64());
339 Ok(ids)
340 }
341
342 async fn allocate_user_ids(
346 &mut self,
347 amount: u64,
348 commit_ts: Timestamp,
349 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
350 let ids = self
351 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
352 .await?;
353 let ids = ids
354 .iter()
355 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
356 .collect();
357 Ok(ids)
358 }
359
360 async fn allocate_user_id(
364 &mut self,
365 commit_ts: Timestamp,
366 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
367 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
368 let id = id.into_element();
369 Ok((CatalogItemId::User(id), GlobalId::User(id)))
370 }
371
372 async fn allocate_user_cluster_id(
376 &mut self,
377 commit_ts: Timestamp,
378 ) -> Result<ClusterId, CatalogError> {
379 let id = self
380 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
381 .await?
382 .into_element();
383 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
384 }
385}
386
387trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
388impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
389
390#[derive(Debug)]
392pub struct AuditLogIterator {
393 audit_logs: Box<dyn AuditLogIteratorTrait>,
396}
397
398impl AuditLogIterator {
399 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
400 let audit_logs = audit_logs
401 .into_iter()
402 .map(|(kind, ts, diff)| {
403 assert_eq!(
404 diff,
405 Diff::ONE,
406 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
407 );
408 assert!(
409 kind.is_audit_log(),
410 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
411 );
412 let id = kind.audit_log_id();
413 (kind, ts, id)
414 })
415 .sorted_by_key(|(_, ts, id)| (*ts, *id))
416 .map(|(kind, ts, _id)| (kind, ts))
417 .rev()
418 .map(|(kind, ts)| {
419 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
421 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
422 .try_into()
423 .expect("invalid persisted update: {update:#?}");
424 let kind = kind.expect("audit log always produces im-memory updates");
425 let audit_log = match kind {
426 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
427 kind => unreachable!("invalid kind: {kind:?}"),
428 };
429 (audit_log, ts)
430 });
431 Self {
432 audit_logs: Box::new(audit_logs),
433 }
434 }
435}
436
437impl Iterator for AuditLogIterator {
438 type Item = (AuditLog, Timestamp);
439
440 fn next(&mut self) -> Option<Self::Item> {
441 self.audit_logs.next()
442 }
443}
444
445#[derive(Debug, Clone)]
447pub struct TestCatalogStateBuilder {
448 persist_client: PersistClient,
449 organization_id: Uuid,
450 version: semver::Version,
451 deploy_generation: Option<u64>,
452 metrics: Arc<Metrics>,
453}
454
455impl TestCatalogStateBuilder {
456 pub fn new(persist_client: PersistClient) -> Self {
457 Self {
458 persist_client,
459 organization_id: Uuid::new_v4(),
460 version: semver::Version::new(0, 0, 0),
461 deploy_generation: None,
462 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
463 }
464 }
465
466 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
467 self.organization_id = organization_id;
468 self
469 }
470
471 pub fn with_version(mut self, version: semver::Version) -> Self {
472 self.version = version;
473 self
474 }
475
476 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
477 self.deploy_generation = Some(deploy_generation);
478 self
479 }
480
481 pub fn with_default_deploy_generation(self) -> Self {
482 self.with_deploy_generation(0)
483 }
484
485 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
486 self.metrics = metrics;
487 self
488 }
489
490 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
491 persist_backed_catalog_state(
492 self.persist_client,
493 self.organization_id,
494 self.version,
495 self.deploy_generation,
496 self.metrics,
497 )
498 .await
499 }
500
501 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
502 self.expect_build("failed to build").await
503 }
504
505 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
506 self.build().await.expect(msg)
507 }
508}
509
510pub async fn persist_backed_catalog_state(
514 persist_client: PersistClient,
515 organization_id: Uuid,
516 version: semver::Version,
517 deploy_generation: Option<u64>,
518 metrics: Arc<Metrics>,
519) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
520 let state = UnopenedPersistCatalogState::new(
521 persist_client,
522 organization_id,
523 version,
524 deploy_generation,
525 metrics,
526 )
527 .await?;
528 Ok(Box::new(state))
529}
530
531pub fn test_bootstrap_args() -> BootstrapArgs {
532 BootstrapArgs {
533 default_cluster_replica_size: "scale=1,workers=1".into(),
534 default_cluster_replication_factor: 1,
535 bootstrap_role: None,
536 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
537 }
538}