1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33use crate::durable::objects::AuditLog;
34pub use crate::durable::objects::Snapshot;
35pub use crate::durable::objects::state_update::StateUpdate;
36use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
37pub use crate::durable::objects::{
38 BurstState, Cluster, ClusterConfig, ClusterReplica, ClusterSystemConfiguration, ClusterVariant,
39 ClusterVariantManaged, Comment, Database, DefaultPrivilege, IntrospectionSourceIndex, Item,
40 NetworkPolicy, ReconfigurationState, ReconfigurationTarget, ReplicaConfig, ReplicaLocation,
41 ReplicaSystemConfiguration, Role, RoleAuth, Schema, SourceReference, SourceReferences,
42 StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
43 UnfinalizedShard,
44};
45pub use crate::durable::persist::shard_id;
46use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
47pub use crate::durable::transaction::Transaction;
48use crate::durable::transaction::TransactionBatch;
49pub use crate::durable::upgrade::CATALOG_VERSION;
50use crate::memory;
51
52pub mod debug;
53mod error;
54pub mod initialize;
55mod metrics;
56pub mod objects;
57mod persist;
58mod traits;
59mod transaction;
60mod upgrade;
61
62pub const DATABASE_ID_ALLOC_KEY: &str = "database";
63pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
64pub const USER_ITEM_ALLOC_KEY: &str = "user";
65pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
66pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
67pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
68pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
69pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
70pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
71pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
72pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
73pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
74pub const OID_ALLOC_KEY: &str = "oid";
75pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
76pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
77pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
78pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
79
80#[derive(Clone, Debug)]
81pub struct BootstrapArgs {
82 pub cluster_replica_size_map: ClusterReplicaSizeMap,
83 pub default_cluster_replica_size: String,
84 pub default_cluster_replication_factor: u32,
85 pub bootstrap_role: Option<String>,
86}
87
88pub type Epoch = NonZeroI64;
89
90#[async_trait]
94pub trait OpenableDurableCatalogState: Debug + Send {
95 async fn open_savepoint(
114 mut self: Box<Self>,
115 initial_ts: Timestamp,
116 bootstrap_args: &BootstrapArgs,
117 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
118
119 async fn open_read_only(
125 mut self: Box<Self>,
126 bootstrap_args: &BootstrapArgs,
127 ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
128
129 async fn open(
137 mut self: Box<Self>,
138 initial_ts: Timestamp,
139 bootstrap_args: &BootstrapArgs,
140 ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
141
142 async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
145
146 async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
148
149 async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
159
160 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
163
164 async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
170
171 async fn get_0dt_deployment_ddl_check_interval(
177 &mut self,
178 ) -> Result<Option<Duration>, CatalogError>;
179
180 async fn get_enable_0dt_deployment_panic_after_timeout(
187 &mut self,
188 ) -> Result<Option<bool>, CatalogError>;
189
190 async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
192
193 async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
195
196 async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
198
199 async fn expire(self: Box<Self>);
201}
202
203#[async_trait]
205pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
206 fn epoch(&self) -> Epoch;
216
217 fn metrics(&self) -> &Metrics;
219
220 async fn expire(self: Box<Self>);
222
223 fn is_bootstrap_complete(&self) -> bool;
225
226 async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
232
233 async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
235
236 async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
238 self.get_next_id(USER_ITEM_ALLOC_KEY).await
239 }
240
241 async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
243 self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
244 }
245
246 async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
248 self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
249 }
250
251 async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
253 self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
254 }
255
256 async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
258
259 async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
261
262 async fn sync_to_current_updates(
268 &mut self,
269 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
270
271 async fn sync_updates(
280 &mut self,
281 target_upper: Timestamp,
282 ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
283
284 async fn current_upper(&mut self) -> Timestamp;
286}
287
288#[async_trait]
290#[allow(mismatched_lifetime_syntaxes)]
291pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
292 fn is_read_only(&self) -> bool;
294
295 fn is_savepoint(&self) -> bool;
297
298 async fn mark_bootstrap_complete(&mut self);
300
301 async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
303
304 fn transaction_from_snapshot(
310 &mut self,
311 snapshot: Snapshot,
312 ) -> Result<Transaction, CatalogError>;
313
314 async fn commit_transaction(
322 &mut self,
323 txn_batch: TransactionBatch,
324 commit_ts: Timestamp,
325 ) -> Result<Timestamp, CatalogError>;
326
327 async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError>;
332
333 #[mz_ore::instrument(level = "debug")]
337 async fn allocate_id(
338 &mut self,
339 id_type: &str,
340 amount: u64,
341 commit_ts: Timestamp,
342 ) -> Result<Vec<u64>, CatalogError> {
343 let start = Instant::now();
344 if amount == 0 {
345 return Ok(Vec::new());
346 }
347 let mut txn = self.transaction().await?;
348 let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
349 txn.commit_internal(commit_ts).await?;
350 self.metrics()
351 .allocate_id_seconds
352 .observe(start.elapsed().as_secs_f64());
353 Ok(ids)
354 }
355
356 async fn allocate_user_ids(
360 &mut self,
361 amount: u64,
362 commit_ts: Timestamp,
363 ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
364 let ids = self
365 .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
366 .await?;
367 let ids = ids
368 .iter()
369 .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
370 .collect();
371 Ok(ids)
372 }
373
374 async fn allocate_user_id(
378 &mut self,
379 commit_ts: Timestamp,
380 ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
381 let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
382 let id = id.into_element();
383 Ok((CatalogItemId::User(id), GlobalId::User(id)))
384 }
385
386 async fn allocate_user_cluster_id(
390 &mut self,
391 commit_ts: Timestamp,
392 ) -> Result<ClusterId, CatalogError> {
393 let id = self
394 .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
395 .await?
396 .into_element();
397 Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
398 }
399
400 async fn allocate_user_replica_ids(
404 &mut self,
405 amount: u64,
406 commit_ts: Timestamp,
407 ) -> Result<Vec<ReplicaId>, CatalogError> {
408 let ids = self
409 .allocate_id(USER_REPLICA_ID_ALLOC_KEY, amount, commit_ts)
410 .await?;
411 let ids = ids.into_iter().map(ReplicaId::User).collect();
412 Ok(ids)
413 }
414
415 async fn allocate_system_replica_ids(
419 &mut self,
420 amount: u64,
421 commit_ts: Timestamp,
422 ) -> Result<Vec<ReplicaId>, CatalogError> {
423 let ids = self
424 .allocate_id(SYSTEM_REPLICA_ID_ALLOC_KEY, amount, commit_ts)
425 .await?;
426 let ids = ids.into_iter().map(ReplicaId::System).collect();
427 Ok(ids)
428 }
429
430 fn shard_id(&self) -> ShardId;
431}
432
433trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
434impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
435
436#[derive(Debug)]
438pub struct AuditLogIterator {
439 audit_logs: Box<dyn AuditLogIteratorTrait>,
442}
443
444impl AuditLogIterator {
445 fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
446 let audit_logs = audit_logs
447 .into_iter()
448 .map(|(kind, ts, diff)| {
449 assert_eq!(
450 diff,
451 Diff::ONE,
452 "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
453 );
454 assert!(
455 kind.is_audit_log(),
456 "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
457 );
458 let id = kind.audit_log_id();
459 (kind, ts, id)
460 })
461 .sorted_by_key(|(_, ts, id)| (*ts, *id))
462 .map(|(kind, ts, _id)| (kind, ts))
463 .rev()
464 .map(|(kind, ts)| {
465 let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
467 let kind: Option<memory::objects::StateUpdateKind> = (&kind)
468 .try_into()
469 .expect("invalid persisted update: {update:#?}");
470 let kind = kind.expect("audit log always produces im-memory updates");
471 let audit_log = match kind {
472 memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
473 kind => unreachable!("invalid kind: {kind:?}"),
474 };
475 (audit_log, ts)
476 });
477 Self {
478 audit_logs: Box::new(audit_logs),
479 }
480 }
481}
482
483impl Iterator for AuditLogIterator {
484 type Item = (AuditLog, Timestamp);
485
486 fn next(&mut self) -> Option<Self::Item> {
487 self.audit_logs.next()
488 }
489}
490
491pub fn persist_desc() -> RelationDesc {
494 RelationDesc::builder()
495 .with_column("data", SqlScalarType::Jsonb.nullable(false))
496 .finish()
497}
498
499#[derive(Debug, Clone)]
501pub struct TestCatalogStateBuilder {
502 persist_client: PersistClient,
503 organization_id: Uuid,
504 version: semver::Version,
505 deploy_generation: Option<u64>,
506 metrics: Arc<Metrics>,
507}
508
509impl TestCatalogStateBuilder {
510 pub fn new(persist_client: PersistClient) -> Self {
511 Self {
512 persist_client,
513 organization_id: Uuid::new_v4(),
514 version: semver::Version::new(0, 0, 0),
515 deploy_generation: None,
516 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
517 }
518 }
519
520 pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
521 self.organization_id = organization_id;
522 self
523 }
524
525 pub fn with_version(mut self, version: semver::Version) -> Self {
526 self.version = version;
527 self
528 }
529
530 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
531 self.deploy_generation = Some(deploy_generation);
532 self
533 }
534
535 pub fn with_default_deploy_generation(self) -> Self {
536 self.with_deploy_generation(0)
537 }
538
539 pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
540 self.metrics = metrics;
541 self
542 }
543
544 pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
545 persist_backed_catalog_state(
546 self.persist_client,
547 self.organization_id,
548 self.version,
549 self.deploy_generation,
550 self.metrics,
551 )
552 .await
553 }
554
555 pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
556 self.expect_build("failed to build").await
557 }
558
559 pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
560 self.build().await.expect(msg)
561 }
562}
563
564pub async fn persist_backed_catalog_state(
568 persist_client: PersistClient,
569 organization_id: Uuid,
570 version: semver::Version,
571 deploy_generation: Option<u64>,
572 metrics: Arc<Metrics>,
573) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
574 let state = UnopenedPersistCatalogState::new(
575 persist_client,
576 organization_id,
577 version,
578 deploy_generation,
579 metrics,
580 )
581 .await?;
582 Ok(Box::new(state))
583}
584
585pub fn test_bootstrap_args() -> BootstrapArgs {
586 BootstrapArgs {
587 default_cluster_replica_size: "scale=1,workers=1".into(),
588 default_cluster_replication_factor: 1,
589 bootstrap_role: None,
590 cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
591 }
592}