1use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40    UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
76
77#[derive(Clone, Debug)]
78pub struct BootstrapArgs {
79    pub cluster_replica_size_map: ClusterReplicaSizeMap,
80    pub default_cluster_replica_size: String,
81    pub default_cluster_replication_factor: u32,
82    pub bootstrap_role: Option<String>,
83}
84
85pub type Epoch = NonZeroI64;
86
87#[async_trait]
91pub trait OpenableDurableCatalogState: Debug + Send {
92    async fn open_savepoint(
111        mut self: Box<Self>,
112        initial_ts: Timestamp,
113        bootstrap_args: &BootstrapArgs,
114    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
115
116    async fn open_read_only(
122        mut self: Box<Self>,
123        bootstrap_args: &BootstrapArgs,
124    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
125
126    async fn open(
134        mut self: Box<Self>,
135        initial_ts: Timestamp,
136        bootstrap_args: &BootstrapArgs,
137    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
138
139    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
142
143    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
145
146    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
156
157    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
160
161    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
167
168    async fn get_0dt_deployment_ddl_check_interval(
174        &mut self,
175    ) -> Result<Option<Duration>, CatalogError>;
176
177    async fn get_enable_0dt_deployment_panic_after_timeout(
184        &mut self,
185    ) -> Result<Option<bool>, CatalogError>;
186
187    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
189
190    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
192
193    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
195
196    async fn expire(self: Box<Self>);
198}
199
200#[async_trait]
202pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
203    fn epoch(&self) -> Epoch;
213
214    async fn expire(self: Box<Self>);
216
217    fn is_bootstrap_complete(&self) -> bool;
219
220    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
226
227    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
229
230    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
232        self.get_next_id(USER_ITEM_ALLOC_KEY).await
233    }
234
235    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
237        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
238    }
239
240    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
242        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
243    }
244
245    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
247        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
248    }
249
250    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
252
253    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
255
256    async fn sync_to_current_updates(
262        &mut self,
263    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
264
265    async fn sync_updates(
274        &mut self,
275        target_upper: Timestamp,
276    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
277
278    async fn current_upper(&mut self) -> Timestamp;
280}
281
282#[async_trait]
284#[allow(mismatched_lifetime_syntaxes)]
285pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
286    fn is_read_only(&self) -> bool;
288
289    fn is_savepoint(&self) -> bool;
291
292    fn mark_bootstrap_complete(&mut self);
294
295    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
297
298    async fn commit_transaction(
306        &mut self,
307        txn_batch: TransactionBatch,
308        commit_ts: Timestamp,
309    ) -> Result<Timestamp, CatalogError>;
310
311    async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
315
316    #[mz_ore::instrument(level = "debug")]
320    async fn allocate_id(
321        &mut self,
322        id_type: &str,
323        amount: u64,
324        commit_ts: Timestamp,
325    ) -> Result<Vec<u64>, CatalogError> {
326        if amount == 0 {
327            return Ok(Vec::new());
328        }
329        let mut txn = self.transaction().await?;
330        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
331        txn.commit_internal(commit_ts).await?;
332        Ok(ids)
333    }
334
335    async fn allocate_user_ids(
339        &mut self,
340        amount: u64,
341        commit_ts: Timestamp,
342    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
343        let ids = self
344            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
345            .await?;
346        let ids = ids
347            .iter()
348            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
349            .collect();
350        Ok(ids)
351    }
352
353    async fn allocate_user_id(
357        &mut self,
358        commit_ts: Timestamp,
359    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
360        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
361        let id = id.into_element();
362        Ok((CatalogItemId::User(id), GlobalId::User(id)))
363    }
364
365    async fn allocate_user_cluster_id(
369        &mut self,
370        commit_ts: Timestamp,
371    ) -> Result<ClusterId, CatalogError> {
372        let id = self
373            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
374            .await?
375            .into_element();
376        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
377    }
378}
379
380trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
381impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
382
383#[derive(Debug)]
385pub struct AuditLogIterator {
386    audit_logs: Box<dyn AuditLogIteratorTrait>,
389}
390
391impl AuditLogIterator {
392    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
393        let audit_logs = audit_logs
394            .into_iter()
395            .map(|(kind, ts, diff)| {
396                assert_eq!(
397                    diff,
398                    Diff::ONE,
399                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
400                );
401                assert!(
402                    kind.is_audit_log(),
403                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
404                );
405                let id = kind.audit_log_id();
406                (kind, ts, id)
407            })
408            .sorted_by_key(|(_, ts, id)| (*ts, *id))
409            .map(|(kind, ts, _id)| (kind, ts))
410            .rev()
411            .map(|(kind, ts)| {
412                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
414                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
415                    .try_into()
416                    .expect("invalid persisted update: {update:#?}");
417                let kind = kind.expect("audit log always produces im-memory updates");
418                let audit_log = match kind {
419                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
420                    kind => unreachable!("invalid kind: {kind:?}"),
421                };
422                (audit_log, ts)
423            });
424        Self {
425            audit_logs: Box::new(audit_logs),
426        }
427    }
428}
429
430impl Iterator for AuditLogIterator {
431    type Item = (AuditLog, Timestamp);
432
433    fn next(&mut self) -> Option<Self::Item> {
434        self.audit_logs.next()
435    }
436}
437
438#[derive(Debug, Clone)]
440pub struct TestCatalogStateBuilder {
441    persist_client: PersistClient,
442    organization_id: Uuid,
443    version: semver::Version,
444    deploy_generation: Option<u64>,
445    metrics: Arc<Metrics>,
446}
447
448impl TestCatalogStateBuilder {
449    pub fn new(persist_client: PersistClient) -> Self {
450        Self {
451            persist_client,
452            organization_id: Uuid::new_v4(),
453            version: semver::Version::new(0, 0, 0),
454            deploy_generation: None,
455            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
456        }
457    }
458
459    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
460        self.organization_id = organization_id;
461        self
462    }
463
464    pub fn with_version(mut self, version: semver::Version) -> Self {
465        self.version = version;
466        self
467    }
468
469    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
470        self.deploy_generation = Some(deploy_generation);
471        self
472    }
473
474    pub fn with_default_deploy_generation(self) -> Self {
475        self.with_deploy_generation(0)
476    }
477
478    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
479        self.metrics = metrics;
480        self
481    }
482
483    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
484        persist_backed_catalog_state(
485            self.persist_client,
486            self.organization_id,
487            self.version,
488            self.deploy_generation,
489            self.metrics,
490        )
491        .await
492    }
493
494    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
495        self.expect_build("failed to build").await
496    }
497
498    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
499        self.build().await.expect(msg)
500    }
501}
502
503pub async fn persist_backed_catalog_state(
507    persist_client: PersistClient,
508    organization_id: Uuid,
509    version: semver::Version,
510    deploy_generation: Option<u64>,
511    metrics: Arc<Metrics>,
512) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
513    let state = UnopenedPersistCatalogState::new(
514        persist_client,
515        organization_id,
516        version,
517        deploy_generation,
518        metrics,
519    )
520    .await?;
521    Ok(Box::new(state))
522}
523
524pub fn test_bootstrap_args() -> BootstrapArgs {
525    BootstrapArgs {
526        default_cluster_replica_size: "scale=1,workers=1".into(),
527        default_cluster_replication_factor: 1,
528        bootstrap_role: None,
529        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
530    }
531}