Skip to main content

mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33use crate::durable::objects::AuditLog;
34pub use crate::durable::objects::Snapshot;
35pub use crate::durable::objects::state_update::StateUpdate;
36use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
37pub use crate::durable::objects::{
38    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
39    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
40    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
41    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
42    UnfinalizedShard,
43};
44pub use crate::durable::persist::shard_id;
45use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
46pub use crate::durable::transaction::Transaction;
47use crate::durable::transaction::TransactionBatch;
48pub use crate::durable::upgrade::CATALOG_VERSION;
49use crate::memory;
50
51pub mod debug;
52mod error;
53pub mod initialize;
54mod metrics;
55pub mod objects;
56mod persist;
57mod traits;
58mod transaction;
59mod upgrade;
60
61pub const DATABASE_ID_ALLOC_KEY: &str = "database";
62pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
63pub const USER_ITEM_ALLOC_KEY: &str = "user";
64pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
65pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
66pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
67pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
68pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
69pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
70pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
71pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
72pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
73pub const OID_ALLOC_KEY: &str = "oid";
74pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
75pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
76pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
77pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
78
79#[derive(Clone, Debug)]
80pub struct BootstrapArgs {
81    pub cluster_replica_size_map: ClusterReplicaSizeMap,
82    pub default_cluster_replica_size: String,
83    pub default_cluster_replication_factor: u32,
84    pub bootstrap_role: Option<String>,
85}
86
87pub type Epoch = NonZeroI64;
88
89/// An API for opening a durable catalog state.
90///
91/// If a catalog is not opened, then resources should be release via [`Self::expire`].
92#[async_trait]
93pub trait OpenableDurableCatalogState: Debug + Send {
94    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
95    // durable updates will be necessary for zero down time upgrades.
96    /// Opens the catalog in a mode that accepts and buffers all writes,
97    /// but never durably commits them. This is used to check and see if
98    /// opening the catalog would be successful, without making any durable
99    /// changes.
100    ///
101    /// Once a savepoint catalog reads an initial snapshot from durable
102    /// storage, it will never read another update from durable storage. As a
103    /// consequence, savepoint catalogs can never be fenced.
104    ///
105    /// Will return an error in the following scenarios:
106    ///   - Catalog initialization fails.
107    ///   - Catalog migrations fail.
108    ///
109    /// `initial_ts` is used as the initial timestamp for new environments.
110    ///
111    /// Also returns a handle to a thread that is deserializing all of the audit logs.
112    async fn open_savepoint(
113        mut self: Box<Self>,
114        initial_ts: Timestamp,
115        bootstrap_args: &BootstrapArgs,
116    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
117
118    /// Opens the catalog in read only mode. All mutating methods
119    /// will return an error.
120    ///
121    /// If the catalog is uninitialized or requires a migrations, then
122    /// it will fail to open in read only mode.
123    async fn open_read_only(
124        mut self: Box<Self>,
125        bootstrap_args: &BootstrapArgs,
126    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
127
128    /// Opens the catalog in a writeable mode. Optionally initializes the
129    /// catalog, if it has not been initialized, and perform any migrations
130    /// needed.
131    ///
132    /// `initial_ts` is used as the initial timestamp for new environments.
133    ///
134    /// Also returns a handle to a thread that is deserializing all of the audit logs.
135    async fn open(
136        mut self: Box<Self>,
137        initial_ts: Timestamp,
138        bootstrap_args: &BootstrapArgs,
139    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
140
141    /// Opens the catalog for manual editing of the underlying data. This is helpful for
142    /// fixing a corrupt catalog.
143    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
144
145    /// Reports if the catalog state has been initialized.
146    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
147
148    /// Returns the epoch of the current durable catalog state. The epoch acts as
149    /// a fencing token to prevent split brain issues across two
150    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
151    /// catalog, it will increment the epoch by one (or initialize it to some
152    /// value if there's no existing epoch) and store the value in memory. It's
153    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
154    /// for their epoch.
155    ///
156    /// NB: We may remove this in later iterations of Pv2.
157    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
158
159    /// Get the most recent deployment generation written to the catalog. Not necessarily the
160    /// deploy generation of this instance.
161    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
162
163    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
164    ///
165    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
166    /// toggle the flag with LaunchDarkly, but use it in boot before
167    /// LaunchDarkly is available.
168    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
169
170    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
171    ///
172    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
173    /// toggle the flag with LaunchDarkly, but use it in boot before
174    /// LaunchDarkly is available.
175    async fn get_0dt_deployment_ddl_check_interval(
176        &mut self,
177    ) -> Result<Option<Duration>, CatalogError>;
178
179    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
180    /// instance.
181    ///
182    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
183    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
184    /// before LaunchDarkly is available.
185    async fn get_enable_0dt_deployment_panic_after_timeout(
186        &mut self,
187    ) -> Result<Option<bool>, CatalogError>;
188
189    /// Reports if the remote configuration was synchronized at least once.
190    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
191
192    /// Generate an unconsolidated [`Trace`] of catalog contents.
193    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
194
195    /// Generate a consolidated [`Trace`] of catalog contents.
196    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
197
198    /// Politely releases all external resources that can only be released in an async context.
199    async fn expire(self: Box<Self>);
200}
201
202/// A read only API for the durable catalog state.
203#[async_trait]
204pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
205    /// Returns the epoch of the current durable catalog state. The epoch acts as
206    /// a fencing token to prevent split brain issues across two
207    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
208    /// catalog, it will increment the epoch by one (or initialize it to some
209    /// value if there's no existing epoch) and store the value in memory. It's
210    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
211    /// for their epoch.
212    ///
213    /// NB: We may remove this in later iterations of Pv2.
214    fn epoch(&self) -> Epoch;
215
216    /// Returns the metrics for this catalog state.
217    fn metrics(&self) -> &Metrics;
218
219    /// Politely releases all external resources that can only be released in an async context.
220    async fn expire(self: Box<Self>);
221
222    /// Returns true if the system bootstrapping process is complete, false otherwise.
223    fn is_bootstrap_complete(&self) -> bool;
224
225    /// Get all audit log events.
226    ///
227    /// Results are guaranteed to be sorted by ID.
228    ///
229    /// WARNING: This is meant for use in integration tests and has bad performance.
230    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
231
232    /// Get the next ID of `id_type`, without allocating it.
233    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
234
235    /// Get the next user ID without allocating it.
236    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
237        self.get_next_id(USER_ITEM_ALLOC_KEY).await
238    }
239
240    /// Get the next system ID without allocating it.
241    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
242        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
243    }
244
245    /// Get the next system replica id without allocating it.
246    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
247        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
248    }
249
250    /// Get the next user replica id without allocating it.
251    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
252        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
253    }
254
255    /// Get the deployment generation of this instance.
256    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
257
258    /// Get a snapshot of the catalog.
259    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
260
261    /// Listen and return all updates that are currently in the catalog.
262    ///
263    /// IMPORTANT: This excludes updates to storage usage.
264    ///
265    /// Returns an error if this instance has been fenced out.
266    async fn sync_to_current_updates(
267        &mut self,
268    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
269
270    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
271    // it difficult to use for readers. For now it's correct and easy to implement, but we should
272    // consider a better API.
273    /// Listen and return all updates in the catalog up to `target_upper`.
274    ///
275    /// IMPORTANT: This excludes updates to storage usage.
276    ///
277    /// Returns an error if this instance has been fenced out.
278    async fn sync_updates(
279        &mut self,
280        target_upper: Timestamp,
281    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
282
283    /// Fetch the current upper of the catalog state.
284    async fn current_upper(&mut self) -> Timestamp;
285}
286
287/// A read-write API for the durable catalog state.
288#[async_trait]
289#[allow(mismatched_lifetime_syntaxes)]
290pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
291    /// Returns true if the catalog is opened in read only mode, false otherwise.
292    fn is_read_only(&self) -> bool;
293
294    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
295    fn is_savepoint(&self) -> bool;
296
297    /// Marks the bootstrap process as complete.
298    async fn mark_bootstrap_complete(&mut self);
299
300    /// Creates a new durable catalog state transaction.
301    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
302
303    /// Creates a new transaction initialized from the given [`Snapshot`]
304    /// instead of reading from durable storage. Used for incremental DDL
305    /// dry runs where the transaction state from a previous dry run has been
306    /// saved and needs to be restored so it stays in sync with the accumulated
307    /// `CatalogState`.
308    fn transaction_from_snapshot(
309        &mut self,
310        snapshot: Snapshot,
311    ) -> Result<Transaction, CatalogError>;
312
313    /// Commits a durable catalog state transaction. The transaction will be committed at
314    /// `commit_ts`.
315    ///
316    /// Returns what the upper was directly after the transaction committed.
317    ///
318    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
319    /// process.
320    async fn commit_transaction(
321        &mut self,
322        txn_batch: TransactionBatch,
323        commit_ts: Timestamp,
324    ) -> Result<Timestamp, CatalogError>;
325
326    /// Advances the upper of the catalog shard to `new_upper`.
327    ///
328    /// This implicitly confirms leadership, as attempting to advance the catalog frontier will
329    /// fail if the writer has been fenced out.
330    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError>;
331
332    /// Allocates and returns `amount` IDs of `id_type`.
333    ///
334    /// See [`Self::commit_transaction`] for details on `commit_ts`.
335    #[mz_ore::instrument(level = "debug")]
336    async fn allocate_id(
337        &mut self,
338        id_type: &str,
339        amount: u64,
340        commit_ts: Timestamp,
341    ) -> Result<Vec<u64>, CatalogError> {
342        let start = Instant::now();
343        if amount == 0 {
344            return Ok(Vec::new());
345        }
346        let mut txn = self.transaction().await?;
347        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
348        txn.commit_internal(commit_ts).await?;
349        self.metrics()
350            .allocate_id_seconds
351            .observe(start.elapsed().as_secs_f64());
352        Ok(ids)
353    }
354
355    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
356    ///
357    /// See [`Self::commit_transaction`] for details on `commit_ts`.
358    async fn allocate_user_ids(
359        &mut self,
360        amount: u64,
361        commit_ts: Timestamp,
362    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
363        let ids = self
364            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
365            .await?;
366        let ids = ids
367            .iter()
368            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
369            .collect();
370        Ok(ids)
371    }
372
373    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
374    ///
375    /// See [`Self::commit_transaction`] for details on `commit_ts`.
376    async fn allocate_user_id(
377        &mut self,
378        commit_ts: Timestamp,
379    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
380        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
381        let id = id.into_element();
382        Ok((CatalogItemId::User(id), GlobalId::User(id)))
383    }
384
385    /// Allocates and returns a user [`ClusterId`].
386    ///
387    /// See [`Self::commit_transaction`] for details on `commit_ts`.
388    async fn allocate_user_cluster_id(
389        &mut self,
390        commit_ts: Timestamp,
391    ) -> Result<ClusterId, CatalogError> {
392        let id = self
393            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
394            .await?
395            .into_element();
396        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
397    }
398
399    fn shard_id(&self) -> ShardId;
400}
401
402trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
403impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
404
405/// An iterator that returns audit log events in reverse ID order.
406#[derive(Debug)]
407pub struct AuditLogIterator {
408    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
409    // first call to `next`, instead of sorting the contents on initialization.
410    audit_logs: Box<dyn AuditLogIteratorTrait>,
411}
412
413impl AuditLogIterator {
414    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
415        let audit_logs = audit_logs
416            .into_iter()
417            .map(|(kind, ts, diff)| {
418                assert_eq!(
419                    diff,
420                    Diff::ONE,
421                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
422                );
423                assert!(
424                    kind.is_audit_log(),
425                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
426                );
427                let id = kind.audit_log_id();
428                (kind, ts, id)
429            })
430            .sorted_by_key(|(_, ts, id)| (*ts, *id))
431            .map(|(kind, ts, _id)| (kind, ts))
432            .rev()
433            .map(|(kind, ts)| {
434                // Each event will be deserialized lazily on a call to `next`.
435                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
436                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
437                    .try_into()
438                    .expect("invalid persisted update: {update:#?}");
439                let kind = kind.expect("audit log always produces im-memory updates");
440                let audit_log = match kind {
441                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
442                    kind => unreachable!("invalid kind: {kind:?}"),
443                };
444                (audit_log, ts)
445            });
446        Self {
447            audit_logs: Box::new(audit_logs),
448        }
449    }
450}
451
452impl Iterator for AuditLogIterator {
453    type Item = (AuditLog, Timestamp);
454
455    fn next(&mut self) -> Option<Self::Item> {
456        self.audit_logs.next()
457    }
458}
459
460/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
461/// shard backing the catalog.
462pub fn persist_desc() -> RelationDesc {
463    RelationDesc::builder()
464        .with_column("data", SqlScalarType::Jsonb.nullable(false))
465        .finish()
466}
467
468/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
469#[derive(Debug, Clone)]
470pub struct TestCatalogStateBuilder {
471    persist_client: PersistClient,
472    organization_id: Uuid,
473    version: semver::Version,
474    deploy_generation: Option<u64>,
475    metrics: Arc<Metrics>,
476}
477
478impl TestCatalogStateBuilder {
479    pub fn new(persist_client: PersistClient) -> Self {
480        Self {
481            persist_client,
482            organization_id: Uuid::new_v4(),
483            version: semver::Version::new(0, 0, 0),
484            deploy_generation: None,
485            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
486        }
487    }
488
489    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
490        self.organization_id = organization_id;
491        self
492    }
493
494    pub fn with_version(mut self, version: semver::Version) -> Self {
495        self.version = version;
496        self
497    }
498
499    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
500        self.deploy_generation = Some(deploy_generation);
501        self
502    }
503
504    pub fn with_default_deploy_generation(self) -> Self {
505        self.with_deploy_generation(0)
506    }
507
508    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
509        self.metrics = metrics;
510        self
511    }
512
513    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
514        persist_backed_catalog_state(
515            self.persist_client,
516            self.organization_id,
517            self.version,
518            self.deploy_generation,
519            self.metrics,
520        )
521        .await
522    }
523
524    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
525        self.expect_build("failed to build").await
526    }
527
528    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
529        self.build().await.expect(msg)
530    }
531}
532
533/// Creates an openable durable catalog state implemented using persist.
534///
535/// `deploy_generation` MUST be `Some` to initialize a new catalog.
536pub async fn persist_backed_catalog_state(
537    persist_client: PersistClient,
538    organization_id: Uuid,
539    version: semver::Version,
540    deploy_generation: Option<u64>,
541    metrics: Arc<Metrics>,
542) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
543    let state = UnopenedPersistCatalogState::new(
544        persist_client,
545        organization_id,
546        version,
547        deploy_generation,
548        metrics,
549    )
550    .await?;
551    Ok(Box::new(state))
552}
553
554pub fn test_bootstrap_args() -> BootstrapArgs {
555    BootstrapArgs {
556        default_cluster_replica_size: "scale=1,workers=1".into(),
557        default_cluster_replication_factor: 1,
558        bootstrap_role: None,
559        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
560    }
561}