mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::Duration;
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40    UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
76
77#[derive(Clone, Debug)]
78pub struct BootstrapArgs {
79    pub cluster_replica_size_map: ClusterReplicaSizeMap,
80    pub default_cluster_replica_size: String,
81    pub default_cluster_replication_factor: u32,
82    pub bootstrap_role: Option<String>,
83}
84
85pub type Epoch = NonZeroI64;
86
87/// An API for opening a durable catalog state.
88///
89/// If a catalog is not opened, then resources should be release via [`Self::expire`].
90#[async_trait]
91pub trait OpenableDurableCatalogState: Debug + Send {
92    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
93    // durable updates will be necessary for zero down time upgrades.
94    /// Opens the catalog in a mode that accepts and buffers all writes,
95    /// but never durably commits them. This is used to check and see if
96    /// opening the catalog would be successful, without making any durable
97    /// changes.
98    ///
99    /// Once a savepoint catalog reads an initial snapshot from durable
100    /// storage, it will never read another update from durable storage. As a
101    /// consequence, savepoint catalogs can never be fenced.
102    ///
103    /// Will return an error in the following scenarios:
104    ///   - Catalog initialization fails.
105    ///   - Catalog migrations fail.
106    ///
107    /// `initial_ts` is used as the initial timestamp for new environments.
108    ///
109    /// Also returns a handle to a thread that is deserializing all of the audit logs.
110    async fn open_savepoint(
111        mut self: Box<Self>,
112        initial_ts: Timestamp,
113        bootstrap_args: &BootstrapArgs,
114    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
115
116    /// Opens the catalog in read only mode. All mutating methods
117    /// will return an error.
118    ///
119    /// If the catalog is uninitialized or requires a migrations, then
120    /// it will fail to open in read only mode.
121    async fn open_read_only(
122        mut self: Box<Self>,
123        bootstrap_args: &BootstrapArgs,
124    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
125
126    /// Opens the catalog in a writeable mode. Optionally initializes the
127    /// catalog, if it has not been initialized, and perform any migrations
128    /// needed.
129    ///
130    /// `initial_ts` is used as the initial timestamp for new environments.
131    ///
132    /// Also returns a handle to a thread that is deserializing all of the audit logs.
133    async fn open(
134        mut self: Box<Self>,
135        initial_ts: Timestamp,
136        bootstrap_args: &BootstrapArgs,
137    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
138
139    /// Opens the catalog for manual editing of the underlying data. This is helpful for
140    /// fixing a corrupt catalog.
141    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
142
143    /// Reports if the catalog state has been initialized.
144    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
145
146    /// Returns the epoch of the current durable catalog state. The epoch acts as
147    /// a fencing token to prevent split brain issues across two
148    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
149    /// catalog, it will increment the epoch by one (or initialize it to some
150    /// value if there's no existing epoch) and store the value in memory. It's
151    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
152    /// for their epoch.
153    ///
154    /// NB: We may remove this in later iterations of Pv2.
155    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
156
157    /// Get the most recent deployment generation written to the catalog. Not necessarily the
158    /// deploy generation of this instance.
159    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
160
161    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
162    ///
163    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
164    /// toggle the flag with LaunchDarkly, but use it in boot before
165    /// LaunchDarkly is available.
166    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
167
168    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
169    ///
170    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
171    /// toggle the flag with LaunchDarkly, but use it in boot before
172    /// LaunchDarkly is available.
173    async fn get_0dt_deployment_ddl_check_interval(
174        &mut self,
175    ) -> Result<Option<Duration>, CatalogError>;
176
177    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
178    /// instance.
179    ///
180    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
181    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
182    /// before LaunchDarkly is available.
183    async fn get_enable_0dt_deployment_panic_after_timeout(
184        &mut self,
185    ) -> Result<Option<bool>, CatalogError>;
186
187    /// Reports if the remote configuration was synchronized at least once.
188    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
189
190    /// Generate an unconsolidated [`Trace`] of catalog contents.
191    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
192
193    /// Generate a consolidated [`Trace`] of catalog contents.
194    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
195
196    /// Politely releases all external resources that can only be released in an async context.
197    async fn expire(self: Box<Self>);
198}
199
200/// A read only API for the durable catalog state.
201#[async_trait]
202pub trait ReadOnlyDurableCatalogState: Debug + Send {
203    /// Returns the epoch of the current durable catalog state. The epoch acts as
204    /// a fencing token to prevent split brain issues across two
205    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
206    /// catalog, it will increment the epoch by one (or initialize it to some
207    /// value if there's no existing epoch) and store the value in memory. It's
208    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
209    /// for their epoch.
210    ///
211    /// NB: We may remove this in later iterations of Pv2.
212    fn epoch(&self) -> Epoch;
213
214    /// Politely releases all external resources that can only be released in an async context.
215    async fn expire(self: Box<Self>);
216
217    /// Returns true if the system bootstrapping process is complete, false otherwise.
218    fn is_bootstrap_complete(&self) -> bool;
219
220    /// Get all audit log events.
221    ///
222    /// Results are guaranteed to be sorted by ID.
223    ///
224    /// WARNING: This is meant for use in integration tests and has bad performance.
225    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
226
227    /// Get the next ID of `id_type`, without allocating it.
228    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
229
230    /// Get the next user ID without allocating it.
231    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
232        self.get_next_id(USER_ITEM_ALLOC_KEY).await
233    }
234
235    /// Get the next system ID without allocating it.
236    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
237        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
238    }
239
240    /// Get the next system replica id without allocating it.
241    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
242        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
243    }
244
245    /// Get the next user replica id without allocating it.
246    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
247        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
248    }
249
250    /// Get the deployment generation of this instance.
251    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
252
253    /// Get a snapshot of the catalog.
254    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
255
256    /// Listen and return all updates that are currently in the catalog.
257    ///
258    /// IMPORTANT: This excludes updates to storage usage.
259    ///
260    /// Returns an error if this instance has been fenced out.
261    async fn sync_to_current_updates(
262        &mut self,
263    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
264
265    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
266    // it difficult to use for readers. For now it's correct and easy to implement, but we should
267    // consider a better API.
268    /// Listen and return all updates in the catalog up to `target_upper`.
269    ///
270    /// IMPORTANT: This excludes updates to storage usage.
271    ///
272    /// Returns an error if this instance has been fenced out.
273    async fn sync_updates(
274        &mut self,
275        target_upper: Timestamp,
276    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
277
278    /// Fetch the current upper of the catalog state.
279    async fn current_upper(&mut self) -> Timestamp;
280}
281
282/// A read-write API for the durable catalog state.
283#[async_trait]
284#[allow(mismatched_lifetime_syntaxes)]
285pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
286    /// Returns true if the catalog is opened in read only mode, false otherwise.
287    fn is_read_only(&self) -> bool;
288
289    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
290    fn is_savepoint(&self) -> bool;
291
292    /// Marks the bootstrap process as complete.
293    fn mark_bootstrap_complete(&mut self);
294
295    /// Creates a new durable catalog state transaction.
296    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
297
298    /// Commits a durable catalog state transaction. The transaction will be committed at
299    /// `commit_ts`.
300    ///
301    /// Returns what the upper was directly after the transaction committed.
302    ///
303    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
304    /// process.
305    async fn commit_transaction(
306        &mut self,
307        txn_batch: TransactionBatch,
308        commit_ts: Timestamp,
309    ) -> Result<Timestamp, CatalogError>;
310
311    /// Confirms that this catalog is connected as the current leader.
312    ///
313    /// NB: We may remove this in later iterations of Pv2.
314    async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
315
316    /// Allocates and returns `amount` IDs of `id_type`.
317    ///
318    /// See [`Self::commit_transaction`] for details on `commit_ts`.
319    #[mz_ore::instrument(level = "debug")]
320    async fn allocate_id(
321        &mut self,
322        id_type: &str,
323        amount: u64,
324        commit_ts: Timestamp,
325    ) -> Result<Vec<u64>, CatalogError> {
326        if amount == 0 {
327            return Ok(Vec::new());
328        }
329        let mut txn = self.transaction().await?;
330        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
331        txn.commit_internal(commit_ts).await?;
332        Ok(ids)
333    }
334
335    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
336    ///
337    /// See [`Self::commit_transaction`] for details on `commit_ts`.
338    async fn allocate_user_ids(
339        &mut self,
340        amount: u64,
341        commit_ts: Timestamp,
342    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
343        let ids = self
344            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
345            .await?;
346        let ids = ids
347            .iter()
348            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
349            .collect();
350        Ok(ids)
351    }
352
353    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
354    ///
355    /// See [`Self::commit_transaction`] for details on `commit_ts`.
356    async fn allocate_user_id(
357        &mut self,
358        commit_ts: Timestamp,
359    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
360        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
361        let id = id.into_element();
362        Ok((CatalogItemId::User(id), GlobalId::User(id)))
363    }
364
365    /// Allocates and returns a user [`ClusterId`].
366    ///
367    /// See [`Self::commit_transaction`] for details on `commit_ts`.
368    async fn allocate_user_cluster_id(
369        &mut self,
370        commit_ts: Timestamp,
371    ) -> Result<ClusterId, CatalogError> {
372        let id = self
373            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
374            .await?
375            .into_element();
376        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
377    }
378}
379
380trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
381impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
382
383/// An iterator that returns audit log events in reverse ID order.
384#[derive(Debug)]
385pub struct AuditLogIterator {
386    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
387    // first call to `next`, instead of sorting the contents on initialization.
388    audit_logs: Box<dyn AuditLogIteratorTrait>,
389}
390
391impl AuditLogIterator {
392    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
393        let audit_logs = audit_logs
394            .into_iter()
395            .map(|(kind, ts, diff)| {
396                assert_eq!(
397                    diff,
398                    Diff::ONE,
399                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
400                );
401                assert!(
402                    kind.is_audit_log(),
403                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
404                );
405                let id = kind.audit_log_id();
406                (kind, ts, id)
407            })
408            .sorted_by_key(|(_, ts, id)| (*ts, *id))
409            .map(|(kind, ts, _id)| (kind, ts))
410            .rev()
411            .map(|(kind, ts)| {
412                // Each event will be deserialized lazily on a call to `next`.
413                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
414                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
415                    .try_into()
416                    .expect("invalid persisted update: {update:#?}");
417                let kind = kind.expect("audit log always produces im-memory updates");
418                let audit_log = match kind {
419                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
420                    kind => unreachable!("invalid kind: {kind:?}"),
421                };
422                (audit_log, ts)
423            });
424        Self {
425            audit_logs: Box::new(audit_logs),
426        }
427    }
428}
429
430impl Iterator for AuditLogIterator {
431    type Item = (AuditLog, Timestamp);
432
433    fn next(&mut self) -> Option<Self::Item> {
434        self.audit_logs.next()
435    }
436}
437
438/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
439#[derive(Debug, Clone)]
440pub struct TestCatalogStateBuilder {
441    persist_client: PersistClient,
442    organization_id: Uuid,
443    version: semver::Version,
444    deploy_generation: Option<u64>,
445    metrics: Arc<Metrics>,
446}
447
448impl TestCatalogStateBuilder {
449    pub fn new(persist_client: PersistClient) -> Self {
450        Self {
451            persist_client,
452            organization_id: Uuid::new_v4(),
453            version: semver::Version::new(0, 0, 0),
454            deploy_generation: None,
455            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
456        }
457    }
458
459    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
460        self.organization_id = organization_id;
461        self
462    }
463
464    pub fn with_version(mut self, version: semver::Version) -> Self {
465        self.version = version;
466        self
467    }
468
469    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
470        self.deploy_generation = Some(deploy_generation);
471        self
472    }
473
474    pub fn with_default_deploy_generation(self) -> Self {
475        self.with_deploy_generation(0)
476    }
477
478    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
479        self.metrics = metrics;
480        self
481    }
482
483    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
484        persist_backed_catalog_state(
485            self.persist_client,
486            self.organization_id,
487            self.version,
488            self.deploy_generation,
489            self.metrics,
490        )
491        .await
492    }
493
494    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
495        self.expect_build("failed to build").await
496    }
497
498    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
499        self.build().await.expect(msg)
500    }
501}
502
503/// Creates an openable durable catalog state implemented using persist.
504///
505/// `deploy_generation` MUST be `Some` to initialize a new catalog.
506pub async fn persist_backed_catalog_state(
507    persist_client: PersistClient,
508    organization_id: Uuid,
509    version: semver::Version,
510    deploy_generation: Option<u64>,
511    metrics: Arc<Metrics>,
512) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
513    let state = UnopenedPersistCatalogState::new(
514        persist_client,
515        organization_id,
516        version,
517        deploy_generation,
518        metrics,
519    )
520    .await?;
521    Ok(Box::new(state))
522}
523
524pub fn test_bootstrap_args() -> BootstrapArgs {
525    BootstrapArgs {
526        default_cluster_replica_size: "scale=1,workers=1".into(),
527        default_cluster_replication_factor: 1,
528        bootstrap_role: None,
529        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
530    }
531}