mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::ClusterId;
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Diff, GlobalId};
25use mz_sql::catalog::CatalogError as SqlCatalogError;
26use uuid::Uuid;
27
28use crate::config::ClusterReplicaSizeMap;
29use crate::durable::debug::{DebugCatalogState, Trace};
30pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
31pub use crate::durable::metrics::Metrics;
32pub use crate::durable::objects::state_update::StateUpdate;
33use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34use crate::durable::objects::{AuditLog, Snapshot};
35pub use crate::durable::objects::{
36    Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
37    Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
38    ReplicaLocation, Role, RoleAuth, Schema, SourceReference, SourceReferences,
39    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
40    UnfinalizedShard,
41};
42pub use crate::durable::persist::shard_id;
43use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
44pub use crate::durable::transaction::Transaction;
45use crate::durable::transaction::TransactionBatch;
46pub use crate::durable::upgrade::CATALOG_VERSION;
47use crate::memory;
48
49pub mod debug;
50mod error;
51pub mod initialize;
52mod metrics;
53pub mod objects;
54mod persist;
55mod traits;
56mod transaction;
57mod upgrade;
58
59pub const DATABASE_ID_ALLOC_KEY: &str = "database";
60pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
61pub const USER_ITEM_ALLOC_KEY: &str = "user";
62pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
63pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
64pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
65pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
66pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
67pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
68pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
69pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
70pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
71pub const OID_ALLOC_KEY: &str = "oid";
72pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
73pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
74pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
75pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
76
77#[derive(Clone, Debug)]
78pub struct BootstrapArgs {
79    pub cluster_replica_size_map: ClusterReplicaSizeMap,
80    pub default_cluster_replica_size: String,
81    pub default_cluster_replication_factor: u32,
82    pub bootstrap_role: Option<String>,
83}
84
85pub type Epoch = NonZeroI64;
86
87/// An API for opening a durable catalog state.
88///
89/// If a catalog is not opened, then resources should be release via [`Self::expire`].
90#[async_trait]
91pub trait OpenableDurableCatalogState: Debug + Send {
92    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
93    // durable updates will be necessary for zero down time upgrades.
94    /// Opens the catalog in a mode that accepts and buffers all writes,
95    /// but never durably commits them. This is used to check and see if
96    /// opening the catalog would be successful, without making any durable
97    /// changes.
98    ///
99    /// Once a savepoint catalog reads an initial snapshot from durable
100    /// storage, it will never read another update from durable storage. As a
101    /// consequence, savepoint catalogs can never be fenced.
102    ///
103    /// Will return an error in the following scenarios:
104    ///   - Catalog initialization fails.
105    ///   - Catalog migrations fail.
106    ///
107    /// `initial_ts` is used as the initial timestamp for new environments.
108    ///
109    /// Also returns a handle to a thread that is deserializing all of the audit logs.
110    async fn open_savepoint(
111        mut self: Box<Self>,
112        initial_ts: Timestamp,
113        bootstrap_args: &BootstrapArgs,
114    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
115
116    /// Opens the catalog in read only mode. All mutating methods
117    /// will return an error.
118    ///
119    /// If the catalog is uninitialized or requires a migrations, then
120    /// it will fail to open in read only mode.
121    async fn open_read_only(
122        mut self: Box<Self>,
123        bootstrap_args: &BootstrapArgs,
124    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
125
126    /// Opens the catalog in a writeable mode. Optionally initializes the
127    /// catalog, if it has not been initialized, and perform any migrations
128    /// needed.
129    ///
130    /// `initial_ts` is used as the initial timestamp for new environments.
131    ///
132    /// Also returns a handle to a thread that is deserializing all of the audit logs.
133    async fn open(
134        mut self: Box<Self>,
135        initial_ts: Timestamp,
136        bootstrap_args: &BootstrapArgs,
137    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
138
139    /// Opens the catalog for manual editing of the underlying data. This is helpful for
140    /// fixing a corrupt catalog.
141    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
142
143    /// Reports if the catalog state has been initialized.
144    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
145
146    /// Returns the epoch of the current durable catalog state. The epoch acts as
147    /// a fencing token to prevent split brain issues across two
148    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
149    /// catalog, it will increment the epoch by one (or initialize it to some
150    /// value if there's no existing epoch) and store the value in memory. It's
151    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
152    /// for their epoch.
153    ///
154    /// NB: We may remove this in later iterations of Pv2.
155    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
156
157    /// Get the most recent deployment generation written to the catalog. Not necessarily the
158    /// deploy generation of this instance.
159    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
160
161    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
162    ///
163    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
164    /// toggle the flag with LaunchDarkly, but use it in boot before
165    /// LaunchDarkly is available.
166    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
167
168    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
169    ///
170    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
171    /// toggle the flag with LaunchDarkly, but use it in boot before
172    /// LaunchDarkly is available.
173    async fn get_0dt_deployment_ddl_check_interval(
174        &mut self,
175    ) -> Result<Option<Duration>, CatalogError>;
176
177    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
178    /// instance.
179    ///
180    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
181    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
182    /// before LaunchDarkly is available.
183    async fn get_enable_0dt_deployment_panic_after_timeout(
184        &mut self,
185    ) -> Result<Option<bool>, CatalogError>;
186
187    /// Reports if the remote configuration was synchronized at least once.
188    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
189
190    /// Generate an unconsolidated [`Trace`] of catalog contents.
191    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
192
193    /// Generate a consolidated [`Trace`] of catalog contents.
194    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
195
196    /// Politely releases all external resources that can only be released in an async context.
197    async fn expire(self: Box<Self>);
198}
199
200/// A read only API for the durable catalog state.
201#[async_trait]
202pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
203    /// Returns the epoch of the current durable catalog state. The epoch acts as
204    /// a fencing token to prevent split brain issues across two
205    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
206    /// catalog, it will increment the epoch by one (or initialize it to some
207    /// value if there's no existing epoch) and store the value in memory. It's
208    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
209    /// for their epoch.
210    ///
211    /// NB: We may remove this in later iterations of Pv2.
212    fn epoch(&self) -> Epoch;
213
214    /// Returns the metrics for this catalog state.
215    fn metrics(&self) -> &Metrics;
216
217    /// Politely releases all external resources that can only be released in an async context.
218    async fn expire(self: Box<Self>);
219
220    /// Returns true if the system bootstrapping process is complete, false otherwise.
221    fn is_bootstrap_complete(&self) -> bool;
222
223    /// Get all audit log events.
224    ///
225    /// Results are guaranteed to be sorted by ID.
226    ///
227    /// WARNING: This is meant for use in integration tests and has bad performance.
228    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
229
230    /// Get the next ID of `id_type`, without allocating it.
231    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
232
233    /// Get the next user ID without allocating it.
234    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
235        self.get_next_id(USER_ITEM_ALLOC_KEY).await
236    }
237
238    /// Get the next system ID without allocating it.
239    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
240        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
241    }
242
243    /// Get the next system replica id without allocating it.
244    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
245        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
246    }
247
248    /// Get the next user replica id without allocating it.
249    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
250        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
251    }
252
253    /// Get the deployment generation of this instance.
254    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
255
256    /// Get a snapshot of the catalog.
257    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
258
259    /// Listen and return all updates that are currently in the catalog.
260    ///
261    /// IMPORTANT: This excludes updates to storage usage.
262    ///
263    /// Returns an error if this instance has been fenced out.
264    async fn sync_to_current_updates(
265        &mut self,
266    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
267
268    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
269    // it difficult to use for readers. For now it's correct and easy to implement, but we should
270    // consider a better API.
271    /// Listen and return all updates in the catalog up to `target_upper`.
272    ///
273    /// IMPORTANT: This excludes updates to storage usage.
274    ///
275    /// Returns an error if this instance has been fenced out.
276    async fn sync_updates(
277        &mut self,
278        target_upper: Timestamp,
279    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
280
281    /// Fetch the current upper of the catalog state.
282    async fn current_upper(&mut self) -> Timestamp;
283}
284
285/// A read-write API for the durable catalog state.
286#[async_trait]
287#[allow(mismatched_lifetime_syntaxes)]
288pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
289    /// Returns true if the catalog is opened in read only mode, false otherwise.
290    fn is_read_only(&self) -> bool;
291
292    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
293    fn is_savepoint(&self) -> bool;
294
295    /// Marks the bootstrap process as complete.
296    async fn mark_bootstrap_complete(&mut self);
297
298    /// Creates a new durable catalog state transaction.
299    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
300
301    /// Commits a durable catalog state transaction. The transaction will be committed at
302    /// `commit_ts`.
303    ///
304    /// Returns what the upper was directly after the transaction committed.
305    ///
306    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
307    /// process.
308    async fn commit_transaction(
309        &mut self,
310        txn_batch: TransactionBatch,
311        commit_ts: Timestamp,
312    ) -> Result<Timestamp, CatalogError>;
313
314    /// Confirms that this catalog is connected as the current leader.
315    ///
316    /// NB: We may remove this in later iterations of Pv2.
317    async fn confirm_leadership(&mut self) -> Result<(), CatalogError>;
318
319    /// Allocates and returns `amount` IDs of `id_type`.
320    ///
321    /// See [`Self::commit_transaction`] for details on `commit_ts`.
322    #[mz_ore::instrument(level = "debug")]
323    async fn allocate_id(
324        &mut self,
325        id_type: &str,
326        amount: u64,
327        commit_ts: Timestamp,
328    ) -> Result<Vec<u64>, CatalogError> {
329        let start = Instant::now();
330        if amount == 0 {
331            return Ok(Vec::new());
332        }
333        let mut txn = self.transaction().await?;
334        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
335        txn.commit_internal(commit_ts).await?;
336        self.metrics()
337            .allocate_id_seconds
338            .observe(start.elapsed().as_secs_f64());
339        Ok(ids)
340    }
341
342    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
343    ///
344    /// See [`Self::commit_transaction`] for details on `commit_ts`.
345    async fn allocate_user_ids(
346        &mut self,
347        amount: u64,
348        commit_ts: Timestamp,
349    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
350        let ids = self
351            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
352            .await?;
353        let ids = ids
354            .iter()
355            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
356            .collect();
357        Ok(ids)
358    }
359
360    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
361    ///
362    /// See [`Self::commit_transaction`] for details on `commit_ts`.
363    async fn allocate_user_id(
364        &mut self,
365        commit_ts: Timestamp,
366    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
367        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
368        let id = id.into_element();
369        Ok((CatalogItemId::User(id), GlobalId::User(id)))
370    }
371
372    /// Allocates and returns a user [`ClusterId`].
373    ///
374    /// See [`Self::commit_transaction`] for details on `commit_ts`.
375    async fn allocate_user_cluster_id(
376        &mut self,
377        commit_ts: Timestamp,
378    ) -> Result<ClusterId, CatalogError> {
379        let id = self
380            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
381            .await?
382            .into_element();
383        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
384    }
385}
386
387trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
388impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
389
390/// An iterator that returns audit log events in reverse ID order.
391#[derive(Debug)]
392pub struct AuditLogIterator {
393    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
394    // first call to `next`, instead of sorting the contents on initialization.
395    audit_logs: Box<dyn AuditLogIteratorTrait>,
396}
397
398impl AuditLogIterator {
399    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
400        let audit_logs = audit_logs
401            .into_iter()
402            .map(|(kind, ts, diff)| {
403                assert_eq!(
404                    diff,
405                    Diff::ONE,
406                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
407                );
408                assert!(
409                    kind.is_audit_log(),
410                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
411                );
412                let id = kind.audit_log_id();
413                (kind, ts, id)
414            })
415            .sorted_by_key(|(_, ts, id)| (*ts, *id))
416            .map(|(kind, ts, _id)| (kind, ts))
417            .rev()
418            .map(|(kind, ts)| {
419                // Each event will be deserialized lazily on a call to `next`.
420                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
421                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
422                    .try_into()
423                    .expect("invalid persisted update: {update:#?}");
424                let kind = kind.expect("audit log always produces im-memory updates");
425                let audit_log = match kind {
426                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
427                    kind => unreachable!("invalid kind: {kind:?}"),
428                };
429                (audit_log, ts)
430            });
431        Self {
432            audit_logs: Box::new(audit_logs),
433        }
434    }
435}
436
437impl Iterator for AuditLogIterator {
438    type Item = (AuditLog, Timestamp);
439
440    fn next(&mut self) -> Option<Self::Item> {
441        self.audit_logs.next()
442    }
443}
444
445/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
446#[derive(Debug, Clone)]
447pub struct TestCatalogStateBuilder {
448    persist_client: PersistClient,
449    organization_id: Uuid,
450    version: semver::Version,
451    deploy_generation: Option<u64>,
452    metrics: Arc<Metrics>,
453}
454
455impl TestCatalogStateBuilder {
456    pub fn new(persist_client: PersistClient) -> Self {
457        Self {
458            persist_client,
459            organization_id: Uuid::new_v4(),
460            version: semver::Version::new(0, 0, 0),
461            deploy_generation: None,
462            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
463        }
464    }
465
466    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
467        self.organization_id = organization_id;
468        self
469    }
470
471    pub fn with_version(mut self, version: semver::Version) -> Self {
472        self.version = version;
473        self
474    }
475
476    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
477        self.deploy_generation = Some(deploy_generation);
478        self
479    }
480
481    pub fn with_default_deploy_generation(self) -> Self {
482        self.with_deploy_generation(0)
483    }
484
485    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
486        self.metrics = metrics;
487        self
488    }
489
490    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
491        persist_backed_catalog_state(
492            self.persist_client,
493            self.organization_id,
494            self.version,
495            self.deploy_generation,
496            self.metrics,
497        )
498        .await
499    }
500
501    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
502        self.expect_build("failed to build").await
503    }
504
505    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
506        self.build().await.expect(msg)
507    }
508}
509
510/// Creates an openable durable catalog state implemented using persist.
511///
512/// `deploy_generation` MUST be `Some` to initialize a new catalog.
513pub async fn persist_backed_catalog_state(
514    persist_client: PersistClient,
515    organization_id: Uuid,
516    version: semver::Version,
517    deploy_generation: Option<u64>,
518    metrics: Arc<Metrics>,
519) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
520    let state = UnopenedPersistCatalogState::new(
521        persist_client,
522        organization_id,
523        version,
524        deploy_generation,
525        metrics,
526    )
527    .await?;
528    Ok(Box::new(state))
529}
530
531pub fn test_bootstrap_args() -> BootstrapArgs {
532    BootstrapArgs {
533        default_cluster_replica_size: "scale=1,workers=1".into(),
534        default_cluster_replication_factor: 1,
535        bootstrap_role: None,
536        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
537    }
538}