mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40    UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75
76#[derive(Clone, Debug)]
77pub struct BootstrapArgs {
78    pub cluster_replica_size_map: ClusterReplicaSizeMap,
79    pub default_cluster_replica_size: String,
80    pub default_cluster_replication_factor: u32,
81    pub bootstrap_role: Option<String>,
82}
83
84pub type Epoch = NonZeroI64;
85
86/// An API for opening a durable catalog state.
87///
88/// If a catalog is not opened, then resources should be release via [`Self::expire`].
89#[async_trait]
90pub trait OpenableDurableCatalogState: Debug + Send {
91    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
92    // durable updates will be necessary for zero down time upgrades.
93    /// Opens the catalog in a mode that accepts and buffers all writes,
94    /// but never durably commits them. This is used to check and see if
95    /// opening the catalog would be successful, without making any durable
96    /// changes.
97    ///
98    /// Once a savepoint catalog reads an initial snapshot from durable
99    /// storage, it will never read another update from durable storage. As a
100    /// consequence, savepoint catalogs can never be fenced.
101    ///
102    /// Will return an error in the following scenarios:
103    ///   - Catalog initialization fails.
104    ///   - Catalog migrations fail.
105    ///
106    /// `initial_ts` is used as the initial timestamp for new environments.
107    ///
108    /// Also returns a handle to a thread that is deserializing all of the audit logs.
109    async fn open_savepoint(
110        mut self: Box<Self>,
111        initial_ts: Timestamp,
112        bootstrap_args: &BootstrapArgs,
113    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
114
115    /// Opens the catalog in read only mode. All mutating methods
116    /// will return an error.
117    ///
118    /// If the catalog is uninitialized or requires a migrations, then
119    /// it will fail to open in read only mode.
120    async fn open_read_only(
121        mut self: Box<Self>,
122        bootstrap_args: &BootstrapArgs,
123    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
124
125    /// Opens the catalog in a writeable mode. Optionally initializes the
126    /// catalog, if it has not been initialized, and perform any migrations
127    /// needed.
128    ///
129    /// `initial_ts` is used as the initial timestamp for new environments.
130    ///
131    /// Also returns a handle to a thread that is deserializing all of the audit logs.
132    async fn open(
133        mut self: Box<Self>,
134        initial_ts: Timestamp,
135        bootstrap_args: &BootstrapArgs,
136    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
137
138    /// Opens the catalog for manual editing of the underlying data. This is helpful for
139    /// fixing a corrupt catalog.
140    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
141
142    /// Reports if the catalog state has been initialized.
143    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
144
145    /// Returns the epoch of the current durable catalog state. The epoch acts as
146    /// a fencing token to prevent split brain issues across two
147    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
148    /// catalog, it will increment the epoch by one (or initialize it to some
149    /// value if there's no existing epoch) and store the value in memory. It's
150    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
151    /// for their epoch.
152    ///
153    /// NB: We may remove this in later iterations of Pv2.
154    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
155
156    /// Get the most recent deployment generation written to the catalog. Not necessarily the
157    /// deploy generation of this instance.
158    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
159
160    /// Get the `enable_0dt_deployment` config value of this instance.
161    ///
162    /// This mirrors the `enable_0dt_deployment` "system var" so that we can
163    /// toggle the flag with LaunchDarkly, but use it in boot before
164    /// LaunchDarkly is available.
165    async fn get_enable_0dt_deployment(&mut self) -> Result<Option<bool>, CatalogError>;
166
167    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
168    ///
169    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
170    /// toggle the flag with LaunchDarkly, but use it in boot before
171    /// LaunchDarkly is available.
172    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
173
174    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
175    ///
176    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
177    /// toggle the flag with LaunchDarkly, but use it in boot before
178    /// LaunchDarkly is available.
179    async fn get_0dt_deployment_ddl_check_interval(
180        &mut self,
181    ) -> Result<Option<Duration>, CatalogError>;
182
183    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
184    /// instance.
185    ///
186    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
187    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
188    /// before LaunchDarkly is available.
189    async fn get_enable_0dt_deployment_panic_after_timeout(
190        &mut self,
191    ) -> Result<Option<bool>, CatalogError>;
192
193    /// Reports if the remote configuration was synchronized at least once.
194    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
195
196    /// Generate an unconsolidated [`Trace`] of catalog contents.
197    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
198
199    /// Generate a consolidated [`Trace`] of catalog contents.
200    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
201
202    /// Politely releases all external resources that can only be released in an async context.
203    async fn expire(self: Box<Self>);
204}
205
206/// A read only API for the durable catalog state.
207#[async_trait]
208pub trait ReadOnlyDurableCatalogState: Debug + Send {
209    /// Returns the epoch of the current durable catalog state. The epoch acts as
210    /// a fencing token to prevent split brain issues across two
211    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
212    /// catalog, it will increment the epoch by one (or initialize it to some
213    /// value if there's no existing epoch) and store the value in memory. It's
214    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
215    /// for their epoch.
216    ///
217    /// NB: We may remove this in later iterations of Pv2.
218    fn epoch(&self) -> Epoch;
219
220    /// Politely releases all external resources that can only be released in an async context.
221    async fn expire(self: Box<Self>);
222
223    /// Returns true if the system bootstrapping process is complete, false otherwise.
224    fn is_bootstrap_complete(&self) -> bool;
225
226    /// Get all audit log events.
227    ///
228    /// Results are guaranteed to be sorted by ID.
229    ///
230    /// WARNING: This is meant for use in integration tests and has bad performance.
231    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
232
233    /// Get the next ID of `id_type`, without allocating it.
234    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
235
236    /// Get the next user ID without allocating it.
237    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
238        self.get_next_id(USER_ITEM_ALLOC_KEY).await
239    }
240
241    /// Get the next system ID without allocating it.
242    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
243        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
244    }
245
246    /// Get the next system replica id without allocating it.
247    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
248        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
249    }
250
251    /// Get the next user replica id without allocating it.
252    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
253        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
254    }
255
256    /// Get the deployment generation of this instance.
257    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
258
259    /// Get a snapshot of the catalog.
260    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
261
262    /// Listen and return all updates that are currently in the catalog.
263    ///
264    /// IMPORTANT: This excludes updates to storage usage.
265    ///
266    /// Returns an error if this instance has been fenced out.
267    async fn sync_to_current_updates(
268        &mut self,
269    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
270
271    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
272    // it difficult to use for readers. For now it's correct and easy to implement, but we should
273    // consider a better API.
274    /// Listen and return all updates in the catalog up to `target_upper`.
275    ///
276    /// IMPORTANT: This excludes updates to storage usage.
277    ///
278    /// Returns an error if this instance has been fenced out.
279    async fn sync_updates(
280        &mut self,
281        target_upper: Timestamp,
282    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
283
284    /// Fetch the current upper of the catalog state.
285    async fn current_upper(&mut self) -> Timestamp;
286}
287
288/// A read-write API for the durable catalog state.
289#[async_trait]
290#[allow(elided_named_lifetimes)]
291pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
292    /// Returns true if the catalog is opened in read only mode, false otherwise.
293    fn is_read_only(&self) -> bool;
294
295    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
296    fn is_savepoint(&self) -> bool;
297
298    /// Marks the bootstrap process as complete.
299    fn mark_bootstrap_complete(&mut self);
300
301    /// Creates a new durable catalog state transaction.
302    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
303
304    /// Commits a durable catalog state transaction. The transaction will be committed at
305    /// `commit_ts`.
306    ///
307    /// Returns what the upper was directly after the transaction committed.
308    ///
309    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
310    /// process.
311    async fn commit_transaction(
312        &mut self,
313        txn_batch: TransactionBatch,
314        commit_ts: Timestamp,
315    ) -> Result<Timestamp, CatalogError>;
316
317    /// Confirms that this catalog is connected as the current leader.
318    ///
319    /// NB: We may remove this in later iterations of Pv2.
320    async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
321
322    /// Allocates and returns `amount` IDs of `id_type`.
323    ///
324    /// See [`Self::commit_transaction`] for details on `commit_ts`.
325    #[mz_ore::instrument(level = "debug")]
326    async fn allocate_id(
327        &mut self,
328        id_type: &str,
329        amount: u64,
330        commit_ts: Timestamp,
331    ) -> Result<Vec<u64>, CatalogError> {
332        if amount == 0 {
333            return Ok(Vec::new());
334        }
335        let mut txn = self.transaction().await?;
336        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
337        txn.commit_internal(commit_ts).await?;
338        Ok(ids)
339    }
340
341    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
342    ///
343    /// See [`Self::commit_transaction`] for details on `commit_ts`.
344    async fn allocate_user_ids(
345        &mut self,
346        amount: u64,
347        commit_ts: Timestamp,
348    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
349        let ids = self
350            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
351            .await?;
352        let ids = ids
353            .iter()
354            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
355            .collect();
356        Ok(ids)
357    }
358
359    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
360    ///
361    /// See [`Self::commit_transaction`] for details on `commit_ts`.
362    async fn allocate_user_id(
363        &mut self,
364        commit_ts: Timestamp,
365    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
366        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
367        let id = id.into_element();
368        Ok((CatalogItemId::User(id), GlobalId::User(id)))
369    }
370
371    /// Allocates and returns a user [`ClusterId`].
372    ///
373    /// See [`Self::commit_transaction`] for details on `commit_ts`.
374    async fn allocate_user_cluster_id(
375        &mut self,
376        commit_ts: Timestamp,
377    ) -> Result<ClusterId, CatalogError> {
378        let id = self
379            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
380            .await?
381            .into_element();
382        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
383    }
384}
385
386trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
387impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
388
389/// An iterator that returns audit log events in reverse ID order.
390#[derive(Debug)]
391pub struct AuditLogIterator {
392    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
393    // first call to `next`, instead of sorting the contents on initialization.
394    audit_logs: Box<dyn AuditLogIteratorTrait>,
395}
396
397impl AuditLogIterator {
398    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
399        let audit_logs = audit_logs
400            .into_iter()
401            .map(|(kind, ts, diff)| {
402                assert_eq!(
403                    diff,
404                    Diff::ONE,
405                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
406                );
407                assert!(
408                    kind.is_audit_log(),
409                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
410                );
411                let id = kind.audit_log_id();
412                (kind, ts, id)
413            })
414            .sorted_by_key(|(_, ts, id)| (*ts, *id))
415            .map(|(kind, ts, _id)| (kind, ts))
416            .rev()
417            .map(|(kind, ts)| {
418                // Each event will be deserialized lazily on a call to `next`.
419                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
420                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
421                    .try_into()
422                    .expect("invalid persisted update: {update:#?}");
423                let kind = kind.expect("audit log always produces im-memory updates");
424                let audit_log = match kind {
425                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
426                    kind => unreachable!("invalid kind: {kind:?}"),
427                };
428                (audit_log, ts)
429            });
430        Self {
431            audit_logs: Box::new(audit_logs),
432        }
433    }
434}
435
436impl Iterator for AuditLogIterator {
437    type Item = (AuditLog, Timestamp);
438
439    fn next(&mut self) -> Option<Self::Item> {
440        self.audit_logs.next()
441    }
442}
443
444/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
445#[derive(Debug, Clone)]
446pub struct TestCatalogStateBuilder {
447    persist_client: PersistClient,
448    organization_id: Uuid,
449    version: semver::Version,
450    deploy_generation: Option<u64>,
451    metrics: Arc<Metrics>,
452}
453
454impl TestCatalogStateBuilder {
455    pub fn new(persist_client: PersistClient) -> Self {
456        Self {
457            persist_client,
458            organization_id: Uuid::new_v4(),
459            version: semver::Version::new(0, 0, 0),
460            deploy_generation: None,
461            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
462        }
463    }
464
465    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
466        self.organization_id = organization_id;
467        self
468    }
469
470    pub fn with_version(mut self, version: semver::Version) -> Self {
471        self.version = version;
472        self
473    }
474
475    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
476        self.deploy_generation = Some(deploy_generation);
477        self
478    }
479
480    pub fn with_default_deploy_generation(self) -> Self {
481        self.with_deploy_generation(0)
482    }
483
484    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
485        self.metrics = metrics;
486        self
487    }
488
489    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
490        persist_backed_catalog_state(
491            self.persist_client,
492            self.organization_id,
493            self.version,
494            self.deploy_generation,
495            self.metrics,
496        )
497        .await
498    }
499
500    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
501        self.expect_build("failed to build").await
502    }
503
504    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
505        self.build().await.expect(msg)
506    }
507}
508
509/// Creates an openable durable catalog state implemented using persist.
510///
511/// `deploy_generation` MUST be `Some` to initialize a new catalog.
512pub async fn persist_backed_catalog_state(
513    persist_client: PersistClient,
514    organization_id: Uuid,
515    version: semver::Version,
516    deploy_generation: Option<u64>,
517    metrics: Arc<Metrics>,
518) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
519    let state = UnopenedPersistCatalogState::new(
520        persist_client,
521        organization_id,
522        version,
523        deploy_generation,
524        metrics,
525    )
526    .await?;
527    Ok(Box::new(state))
528}
529
530pub fn test_bootstrap_args() -> BootstrapArgs {
531    BootstrapArgs {
532        default_cluster_replica_size: "1".into(),
533        default_cluster_replication_factor: 2,
534        bootstrap_role: None,
535        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
536    }
537}