Skip to main content

mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33pub use crate::durable::objects::state_update::StateUpdate;
34use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
35use crate::durable::objects::{AuditLog, Snapshot};
36pub use crate::durable::objects::{
37    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
38    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
39    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
40    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
41    UnfinalizedShard,
42};
43pub use crate::durable::persist::shard_id;
44use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
45pub use crate::durable::transaction::Transaction;
46use crate::durable::transaction::TransactionBatch;
47pub use crate::durable::upgrade::CATALOG_VERSION;
48use crate::memory;
49
50pub mod debug;
51mod error;
52pub mod initialize;
53mod metrics;
54pub mod objects;
55mod persist;
56mod traits;
57mod transaction;
58mod upgrade;
59
60pub const DATABASE_ID_ALLOC_KEY: &str = "database";
61pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
62pub const USER_ITEM_ALLOC_KEY: &str = "user";
63pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
64pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
65pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
66pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
67pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
68pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
69pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
70pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
71pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
72pub const OID_ALLOC_KEY: &str = "oid";
73pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
74pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
75pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
76pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
77
78#[derive(Clone, Debug)]
79pub struct BootstrapArgs {
80    pub cluster_replica_size_map: ClusterReplicaSizeMap,
81    pub default_cluster_replica_size: String,
82    pub default_cluster_replication_factor: u32,
83    pub bootstrap_role: Option<String>,
84}
85
86pub type Epoch = NonZeroI64;
87
88/// An API for opening a durable catalog state.
89///
90/// If a catalog is not opened, then resources should be release via [`Self::expire`].
91#[async_trait]
92pub trait OpenableDurableCatalogState: Debug + Send {
93    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
94    // durable updates will be necessary for zero down time upgrades.
95    /// Opens the catalog in a mode that accepts and buffers all writes,
96    /// but never durably commits them. This is used to check and see if
97    /// opening the catalog would be successful, without making any durable
98    /// changes.
99    ///
100    /// Once a savepoint catalog reads an initial snapshot from durable
101    /// storage, it will never read another update from durable storage. As a
102    /// consequence, savepoint catalogs can never be fenced.
103    ///
104    /// Will return an error in the following scenarios:
105    ///   - Catalog initialization fails.
106    ///   - Catalog migrations fail.
107    ///
108    /// `initial_ts` is used as the initial timestamp for new environments.
109    ///
110    /// Also returns a handle to a thread that is deserializing all of the audit logs.
111    async fn open_savepoint(
112        mut self: Box<Self>,
113        initial_ts: Timestamp,
114        bootstrap_args: &BootstrapArgs,
115    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
116
117    /// Opens the catalog in read only mode. All mutating methods
118    /// will return an error.
119    ///
120    /// If the catalog is uninitialized or requires a migrations, then
121    /// it will fail to open in read only mode.
122    async fn open_read_only(
123        mut self: Box<Self>,
124        bootstrap_args: &BootstrapArgs,
125    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
126
127    /// Opens the catalog in a writeable mode. Optionally initializes the
128    /// catalog, if it has not been initialized, and perform any migrations
129    /// needed.
130    ///
131    /// `initial_ts` is used as the initial timestamp for new environments.
132    ///
133    /// Also returns a handle to a thread that is deserializing all of the audit logs.
134    async fn open(
135        mut self: Box<Self>,
136        initial_ts: Timestamp,
137        bootstrap_args: &BootstrapArgs,
138    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
139
140    /// Opens the catalog for manual editing of the underlying data. This is helpful for
141    /// fixing a corrupt catalog.
142    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
143
144    /// Reports if the catalog state has been initialized.
145    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
146
147    /// Returns the epoch of the current durable catalog state. The epoch acts as
148    /// a fencing token to prevent split brain issues across two
149    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
150    /// catalog, it will increment the epoch by one (or initialize it to some
151    /// value if there's no existing epoch) and store the value in memory. It's
152    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
153    /// for their epoch.
154    ///
155    /// NB: We may remove this in later iterations of Pv2.
156    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
157
158    /// Get the most recent deployment generation written to the catalog. Not necessarily the
159    /// deploy generation of this instance.
160    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
161
162    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
163    ///
164    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
165    /// toggle the flag with LaunchDarkly, but use it in boot before
166    /// LaunchDarkly is available.
167    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
168
169    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
170    ///
171    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
172    /// toggle the flag with LaunchDarkly, but use it in boot before
173    /// LaunchDarkly is available.
174    async fn get_0dt_deployment_ddl_check_interval(
175        &mut self,
176    ) -> Result<Option<Duration>, CatalogError>;
177
178    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
179    /// instance.
180    ///
181    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
182    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
183    /// before LaunchDarkly is available.
184    async fn get_enable_0dt_deployment_panic_after_timeout(
185        &mut self,
186    ) -> Result<Option<bool>, CatalogError>;
187
188    /// Reports if the remote configuration was synchronized at least once.
189    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
190
191    /// Generate an unconsolidated [`Trace`] of catalog contents.
192    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
193
194    /// Generate a consolidated [`Trace`] of catalog contents.
195    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
196
197    /// Politely releases all external resources that can only be released in an async context.
198    async fn expire(self: Box<Self>);
199}
200
201/// A read only API for the durable catalog state.
202#[async_trait]
203pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
204    /// Returns the epoch of the current durable catalog state. The epoch acts as
205    /// a fencing token to prevent split brain issues across two
206    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
207    /// catalog, it will increment the epoch by one (or initialize it to some
208    /// value if there's no existing epoch) and store the value in memory. It's
209    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
210    /// for their epoch.
211    ///
212    /// NB: We may remove this in later iterations of Pv2.
213    fn epoch(&self) -> Epoch;
214
215    /// Returns the metrics for this catalog state.
216    fn metrics(&self) -> &Metrics;
217
218    /// Politely releases all external resources that can only be released in an async context.
219    async fn expire(self: Box<Self>);
220
221    /// Returns true if the system bootstrapping process is complete, false otherwise.
222    fn is_bootstrap_complete(&self) -> bool;
223
224    /// Get all audit log events.
225    ///
226    /// Results are guaranteed to be sorted by ID.
227    ///
228    /// WARNING: This is meant for use in integration tests and has bad performance.
229    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
230
231    /// Get the next ID of `id_type`, without allocating it.
232    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
233
234    /// Get the next user ID without allocating it.
235    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
236        self.get_next_id(USER_ITEM_ALLOC_KEY).await
237    }
238
239    /// Get the next system ID without allocating it.
240    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
241        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
242    }
243
244    /// Get the next system replica id without allocating it.
245    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
246        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
247    }
248
249    /// Get the next user replica id without allocating it.
250    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
251        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
252    }
253
254    /// Get the deployment generation of this instance.
255    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
256
257    /// Get a snapshot of the catalog.
258    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
259
260    /// Listen and return all updates that are currently in the catalog.
261    ///
262    /// IMPORTANT: This excludes updates to storage usage.
263    ///
264    /// Returns an error if this instance has been fenced out.
265    async fn sync_to_current_updates(
266        &mut self,
267    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
268
269    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
270    // it difficult to use for readers. For now it's correct and easy to implement, but we should
271    // consider a better API.
272    /// Listen and return all updates in the catalog up to `target_upper`.
273    ///
274    /// IMPORTANT: This excludes updates to storage usage.
275    ///
276    /// Returns an error if this instance has been fenced out.
277    async fn sync_updates(
278        &mut self,
279        target_upper: Timestamp,
280    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
281
282    /// Fetch the current upper of the catalog state.
283    async fn current_upper(&mut self) -> Timestamp;
284}
285
286/// A read-write API for the durable catalog state.
287#[async_trait]
288#[allow(mismatched_lifetime_syntaxes)]
289pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
290    /// Returns true if the catalog is opened in read only mode, false otherwise.
291    fn is_read_only(&self) -> bool;
292
293    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
294    fn is_savepoint(&self) -> bool;
295
296    /// Marks the bootstrap process as complete.
297    async fn mark_bootstrap_complete(&mut self);
298
299    /// Creates a new durable catalog state transaction.
300    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
301
302    /// Commits a durable catalog state transaction. The transaction will be committed at
303    /// `commit_ts`.
304    ///
305    /// Returns what the upper was directly after the transaction committed.
306    ///
307    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
308    /// process.
309    async fn commit_transaction(
310        &mut self,
311        txn_batch: TransactionBatch,
312        commit_ts: Timestamp,
313    ) -> Result<Timestamp, CatalogError>;
314
315    /// Confirms that this catalog is connected as the current leader.
316    ///
317    /// NB: We may remove this in later iterations of Pv2.
318    async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
319
320    /// Allocates and returns `amount` IDs of `id_type`.
321    ///
322    /// See [`Self::commit_transaction`] for details on `commit_ts`.
323    #[mz_ore::instrument(level = "debug")]
324    async fn allocate_id(
325        &mut self,
326        id_type: &str,
327        amount: u64,
328        commit_ts: Timestamp,
329    ) -> Result<Vec<u64>, CatalogError> {
330        let start = Instant::now();
331        if amount == 0 {
332            return Ok(Vec::new());
333        }
334        let mut txn = self.transaction().await?;
335        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
336        txn.commit_internal(commit_ts).await?;
337        self.metrics()
338            .allocate_id_seconds
339            .observe(start.elapsed().as_secs_f64());
340        Ok(ids)
341    }
342
343    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
344    ///
345    /// See [`Self::commit_transaction`] for details on `commit_ts`.
346    async fn allocate_user_ids(
347        &mut self,
348        amount: u64,
349        commit_ts: Timestamp,
350    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
351        let ids = self
352            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
353            .await?;
354        let ids = ids
355            .iter()
356            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
357            .collect();
358        Ok(ids)
359    }
360
361    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
362    ///
363    /// See [`Self::commit_transaction`] for details on `commit_ts`.
364    async fn allocate_user_id(
365        &mut self,
366        commit_ts: Timestamp,
367    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
368        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
369        let id = id.into_element();
370        Ok((CatalogItemId::User(id), GlobalId::User(id)))
371    }
372
373    /// Allocates and returns a user [`ClusterId`].
374    ///
375    /// See [`Self::commit_transaction`] for details on `commit_ts`.
376    async fn allocate_user_cluster_id(
377        &mut self,
378        commit_ts: Timestamp,
379    ) -> Result<ClusterId, CatalogError> {
380        let id = self
381            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
382            .await?
383            .into_element();
384        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
385    }
386
387    fn shard_id(&self) -> ShardId;
388}
389
390trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
391impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
392
393/// An iterator that returns audit log events in reverse ID order.
394#[derive(Debug)]
395pub struct AuditLogIterator {
396    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
397    // first call to `next`, instead of sorting the contents on initialization.
398    audit_logs: Box<dyn AuditLogIteratorTrait>,
399}
400
401impl AuditLogIterator {
402    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
403        let audit_logs = audit_logs
404            .into_iter()
405            .map(|(kind, ts, diff)| {
406                assert_eq!(
407                    diff,
408                    Diff::ONE,
409                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
410                );
411                assert!(
412                    kind.is_audit_log(),
413                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
414                );
415                let id = kind.audit_log_id();
416                (kind, ts, id)
417            })
418            .sorted_by_key(|(_, ts, id)| (*ts, *id))
419            .map(|(kind, ts, _id)| (kind, ts))
420            .rev()
421            .map(|(kind, ts)| {
422                // Each event will be deserialized lazily on a call to `next`.
423                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
424                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
425                    .try_into()
426                    .expect("invalid persisted update: {update:#?}");
427                let kind = kind.expect("audit log always produces im-memory updates");
428                let audit_log = match kind {
429                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
430                    kind => unreachable!("invalid kind: {kind:?}"),
431                };
432                (audit_log, ts)
433            });
434        Self {
435            audit_logs: Box::new(audit_logs),
436        }
437    }
438}
439
440impl Iterator for AuditLogIterator {
441    type Item = (AuditLog, Timestamp);
442
443    fn next(&mut self) -> Option<Self::Item> {
444        self.audit_logs.next()
445    }
446}
447
448/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
449/// shard backing the catalog.
450pub fn persist_desc() -> RelationDesc {
451    RelationDesc::builder()
452        .with_column("data", SqlScalarType::Jsonb.nullable(false))
453        .finish()
454}
455
456/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
457#[derive(Debug, Clone)]
458pub struct TestCatalogStateBuilder {
459    persist_client: PersistClient,
460    organization_id: Uuid,
461    version: semver::Version,
462    deploy_generation: Option<u64>,
463    metrics: Arc<Metrics>,
464}
465
466impl TestCatalogStateBuilder {
467    pub fn new(persist_client: PersistClient) -> Self {
468        Self {
469            persist_client,
470            organization_id: Uuid::new_v4(),
471            version: semver::Version::new(0, 0, 0),
472            deploy_generation: None,
473            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
474        }
475    }
476
477    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
478        self.organization_id = organization_id;
479        self
480    }
481
482    pub fn with_version(mut self, version: semver::Version) -> Self {
483        self.version = version;
484        self
485    }
486
487    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
488        self.deploy_generation = Some(deploy_generation);
489        self
490    }
491
492    pub fn with_default_deploy_generation(self) -> Self {
493        self.with_deploy_generation(0)
494    }
495
496    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
497        self.metrics = metrics;
498        self
499    }
500
501    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
502        persist_backed_catalog_state(
503            self.persist_client,
504            self.organization_id,
505            self.version,
506            self.deploy_generation,
507            self.metrics,
508        )
509        .await
510    }
511
512    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
513        self.expect_build("failed to build").await
514    }
515
516    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
517        self.build().await.expect(msg)
518    }
519}
520
521/// Creates an openable durable catalog state implemented using persist.
522///
523/// `deploy_generation` MUST be `Some` to initialize a new catalog.
524pub async fn persist_backed_catalog_state(
525    persist_client: PersistClient,
526    organization_id: Uuid,
527    version: semver::Version,
528    deploy_generation: Option<u64>,
529    metrics: Arc<Metrics>,
530) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
531    let state = UnopenedPersistCatalogState::new(
532        persist_client,
533        organization_id,
534        version,
535        deploy_generation,
536        metrics,
537    )
538    .await?;
539    Ok(Box::new(state))
540}
541
542pub fn test_bootstrap_args() -> BootstrapArgs {
543    BootstrapArgs {
544        default_cluster_replica_size: "scale=1,workers=1".into(),
545        default_cluster_replication_factor: 1,
546        bootstrap_role: None,
547        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
548    }
549}