1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33use crate::durable::objects::AuditLog;
34pub use crate::durable::objects::Snapshot;
35pub use crate::durable::objects::state_update::StateUpdate;
36use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
37pub use crate::durable::objects::{
38 BurstState, Cluster, ClusterConfig, ClusterReplica, ClusterSystemConfiguration, ClusterVariant,
39 ClusterVariantManaged, Comment, Database, DefaultPrivilege, IntrospectionSourceIndex, Item,
40 NetworkPolicy, ReconfigurationState, ReconfigurationTarget, ReplicaConfig, ReplicaLocation,
41 ReplicaSystemConfiguration, Role, RoleAuth, Schema, SourceReference, SourceReferences,
42 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
43 UnfinalizedShard,
44};
45pub use crate::durable::persist::shard_id;
46use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
47pub use crate::durable::transaction::Transaction;
48use crate::durable::transaction::TransactionBatch;
49pub use crate::durable::upgrade::CATALOG_VERSION;
50use crate::memory;
51
52pub mod debug;
53mod error;
54pub mod initialize;
55mod metrics;
56pub mod objects;
57mod persist;
58mod traits;
59mod transaction;
60mod upgrade;
61
62pub const DATABASE_ID_ALLOC_KEY: &str = "database";
63pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
64pub const USER_ITEM_ALLOC_KEY: &str = "user";
65pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
66pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
67pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
68pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
69pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
70pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
71pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
72pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
73pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
74pub const OID_ALLOC_KEY: &str = "oid";
75pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
76pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
77pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
78pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
79
80#[derive(Clone, Debug)]
81pub struct BootstrapArgs {
82 pub cluster_replica_size_map: ClusterReplicaSizeMap,
83 pub default_cluster_replica_size: String,
84 pub default_cluster_replication_factor: u32,
85 pub bootstrap_role: Option<String>,
86}
87
88pub type Epoch = NonZeroI64;
89
90#[async_trait]
94pub trait OpenableDurableCatalogState: Debug + Send {
95 async fn open_savepoint(
114 mut self: Box<Self>,
115 initial_ts: Timestamp,
116 bootstrap_args: &BootstrapArgs,
117 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
118
119 async fn open_read_only(
125 mut self: Box<Self>,
126 bootstrap_args: &BootstrapArgs,
127 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
128
129 async fn open(
137 mut self: Box<Self>,
138 initial_ts: Timestamp,
139 bootstrap_args: &BootstrapArgs,
140 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
141
142 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
145
146 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
148
149 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
159
160 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
163
164 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
170
171 async fn get_0dt_deployment_ddl_check_interval(
177 &mut self,
178 ) -> Result<Option<Duration>, CatalogError>;
179
180 async fn get_enable_0dt_deployment_panic_after_timeout(
187 &mut self,
188 ) -> Result<Option<bool>, CatalogError>;
189
190 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
192
193 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
195
196 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
198
199 async fn expire(self: Box<Self>);
201}
202
203#[async_trait]
205pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
206 fn epoch(&self) -> Epoch;
216
217 fn metrics(&self) -> &Metrics;
219
220 async fn expire(self: Box<Self>);
222
223 fn is_bootstrap_complete(&self) -> bool;
225
226 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
232
233 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
235
236 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
238 self.get_next_id(USER_ITEM_ALLOC_KEY).await
239 }
240
241 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
243 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
244 }
245
246 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
248 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
249 }
250
251 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
253 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
254 }
255
256 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
258
259 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
261
262 async fn sync_to_current_updates(
268 &mut self,
269 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
270
271 async fn sync_updates(
280 &mut self,
281 target_upper: Timestamp,
282 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
283
284 async fn current_upper(&mut self) -> Timestamp;
286}
287
288#[async_trait]
290#[allow(mismatched_lifetime_syntaxes)]
291pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
292 fn is_read_only(&self) -> bool;
294
295 fn is_savepoint(&self) -> bool;
297
298 async fn mark_bootstrap_complete(&mut self);
300
301 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
303
304 fn transaction_from_snapshot(
310 &mut self,
311 snapshot: Snapshot,
312 ) -> Result<Transaction, CatalogError>;
313
314 async fn commit_transaction(
322 &mut self,
323 txn_batch: TransactionBatch,
324 commit_ts: Timestamp,
325 ) -> Result<Timestamp, CatalogError>;
326
327 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError>;
332
333 #[mz_ore::instrument(level = "debug")]
337 async fn allocate_id(
338 &mut self,
339 id_type: &str,
340 amount: u64,
341 commit_ts: Timestamp,
342 ) -> Result<Vec<u64>, CatalogError> {
343 let start = Instant::now();
344 if amount == 0 {
345 return Ok(Vec::new());
346 }
347 let mut txn = self.transaction().await?;
348 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
349 txn.commit_internal(commit_ts).await?;
350 self.metrics()
351 .allocate_id_seconds
352 .observe(start.elapsed().as_secs_f64());
353 Ok(ids)
354 }
355
356 async fn allocate_user_ids(
360 &mut self,
361 amount: u64,
362 commit_ts: Timestamp,
363 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
364 let ids = self
365 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
366 .await?;
367 let ids = ids
368 .iter()
369 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
370 .collect();
371 Ok(ids)
372 }
373
374 async fn allocate_user_id(
378 &mut self,
379 commit_ts: Timestamp,
380 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
381 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
382 let id = id.into_element();
383 Ok((CatalogItemId::User(id), GlobalId::User(id)))
384 }
385
386 async fn allocate_user_cluster_id(
390 &mut self,
391 commit_ts: Timestamp,
392 ) -> Result<ClusterId, CatalogError> {
393 let id = self
394 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
395 .await?
396 .into_element();
397 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
398 }
399
400 fn shard_id(&self) -> ShardId;
401}
402
403trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
404impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
405
406#[derive(Debug)]
408pub struct AuditLogIterator {
409 audit_logs: Box<dyn AuditLogIteratorTrait>,
412}
413
414impl AuditLogIterator {
415 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
416 let audit_logs = audit_logs
417 .into_iter()
418 .map(|(kind, ts, diff)| {
419 assert_eq!(
420 diff,
421 Diff::ONE,
422 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
423 );
424 assert!(
425 kind.is_audit_log(),
426 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
427 );
428 let id = kind.audit_log_id();
429 (kind, ts, id)
430 })
431 .sorted_by_key(|(_, ts, id)| (*ts, *id))
432 .map(|(kind, ts, _id)| (kind, ts))
433 .rev()
434 .map(|(kind, ts)| {
435 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
437 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
438 .try_into()
439 .expect("invalid persisted update: {update:#?}");
440 let kind = kind.expect("audit log always produces im-memory updates");
441 let audit_log = match kind {
442 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
443 kind => unreachable!("invalid kind: {kind:?}"),
444 };
445 (audit_log, ts)
446 });
447 Self {
448 audit_logs: Box::new(audit_logs),
449 }
450 }
451}
452
453impl Iterator for AuditLogIterator {
454 type Item = (AuditLog, Timestamp);
455
456 fn next(&mut self) -> Option<Self::Item> {
457 self.audit_logs.next()
458 }
459}
460
461pub fn persist_desc() -> RelationDesc {
464 RelationDesc::builder()
465 .with_column("data", SqlScalarType::Jsonb.nullable(false))
466 .finish()
467}
468
469#[derive(Debug, Clone)]
471pub struct TestCatalogStateBuilder {
472 persist_client: PersistClient,
473 organization_id: Uuid,
474 version: semver::Version,
475 deploy_generation: Option<u64>,
476 metrics: Arc<Metrics>,
477}
478
479impl TestCatalogStateBuilder {
480 pub fn new(persist_client: PersistClient) -> Self {
481 Self {
482 persist_client,
483 organization_id: Uuid::new_v4(),
484 version: semver::Version::new(0, 0, 0),
485 deploy_generation: None,
486 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
487 }
488 }
489
490 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
491 self.organization_id = organization_id;
492 self
493 }
494
495 pub fn with_version(mut self, version: semver::Version) -> Self {
496 self.version = version;
497 self
498 }
499
500 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
501 self.deploy_generation = Some(deploy_generation);
502 self
503 }
504
505 pub fn with_default_deploy_generation(self) -> Self {
506 self.with_deploy_generation(0)
507 }
508
509 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
510 self.metrics = metrics;
511 self
512 }
513
514 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
515 persist_backed_catalog_state(
516 self.persist_client,
517 self.organization_id,
518 self.version,
519 self.deploy_generation,
520 self.metrics,
521 )
522 .await
523 }
524
525 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
526 self.expect_build("failed to build").await
527 }
528
529 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
530 self.build().await.expect(msg)
531 }
532}
533
534pub async fn persist_backed_catalog_state(
538 persist_client: PersistClient,
539 organization_id: Uuid,
540 version: semver::Version,
541 deploy_generation: Option<u64>,
542 metrics: Arc<Metrics>,
543) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
544 let state = UnopenedPersistCatalogState::new(
545 persist_client,
546 organization_id,
547 version,
548 deploy_generation,
549 metrics,
550 )
551 .await?;
552 Ok(Box::new(state))
553}
554
555pub fn test_bootstrap_args() -> BootstrapArgs {
556 BootstrapArgs {
557 default_cluster_replica_size: "scale=1,workers=1".into(),
558 default_cluster_replication_factor: 1,
559 bootstrap_role: None,
560 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
561 }
562}