Skip to main content

mz_catalog/
durable.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! This crate is responsible for durably storing and modifying the catalog contents.
11
12use std::fmt::Debug;
13use std::num::NonZeroI64;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use async_trait::async_trait;
18use itertools::Itertools;
19use mz_audit_log::VersionedEvent;
20use mz_controller_types::{ClusterId, ReplicaId};
21use mz_ore::collections::CollectionExt;
22use mz_ore::metrics::MetricsRegistry;
23use mz_persist_client::PersistClient;
24use mz_persist_types::ShardId;
25use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, SqlScalarType};
26use mz_sql::catalog::CatalogError as SqlCatalogError;
27use uuid::Uuid;
28
29use crate::config::ClusterReplicaSizeMap;
30use crate::durable::debug::{DebugCatalogState, Trace};
31pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
32pub use crate::durable::metrics::Metrics;
33use crate::durable::objects::AuditLog;
34pub use crate::durable::objects::Snapshot;
35pub use crate::durable::objects::state_update::StateUpdate;
36use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
37pub use crate::durable::objects::{
38    BurstState, Cluster, ClusterConfig, ClusterReplica, ClusterSystemConfiguration, ClusterVariant,
39    ClusterVariantManaged, Comment, Database, DefaultPrivilege, IntrospectionSourceIndex, Item,
40    NetworkPolicy, ReconfigurationState, ReconfigurationTarget, ReplicaConfig, ReplicaLocation,
41    ReplicaSystemConfiguration, Role, RoleAuth, Schema, SourceReference, SourceReferences,
42    StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping,
43    UnfinalizedShard,
44};
45pub use crate::durable::persist::shard_id;
46use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
47pub use crate::durable::transaction::Transaction;
48use crate::durable::transaction::TransactionBatch;
49pub use crate::durable::upgrade::CATALOG_VERSION;
50use crate::memory;
51
52pub mod debug;
53mod error;
54pub mod initialize;
55mod metrics;
56pub mod objects;
57mod persist;
58mod traits;
59mod transaction;
60mod upgrade;
61
62pub const DATABASE_ID_ALLOC_KEY: &str = "database";
63pub const SCHEMA_ID_ALLOC_KEY: &str = "schema";
64pub const USER_ITEM_ALLOC_KEY: &str = "user";
65pub const SYSTEM_ITEM_ALLOC_KEY: &str = "system";
66pub const USER_ROLE_ID_ALLOC_KEY: &str = "user_role";
67pub const USER_CLUSTER_ID_ALLOC_KEY: &str = "user_compute";
68pub const SYSTEM_CLUSTER_ID_ALLOC_KEY: &str = "system_compute";
69pub const USER_REPLICA_ID_ALLOC_KEY: &str = "replica";
70pub const SYSTEM_REPLICA_ID_ALLOC_KEY: &str = "system_replica";
71pub const AUDIT_LOG_ID_ALLOC_KEY: &str = "auditlog";
72pub const STORAGE_USAGE_ID_ALLOC_KEY: &str = "storage_usage";
73pub const USER_NETWORK_POLICY_ID_ALLOC_KEY: &str = "user_network_policy";
74pub const OID_ALLOC_KEY: &str = "oid";
75pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";
76pub const BUILTIN_MIGRATION_SHARD_KEY: &str = "builtin_migration_shard";
77pub const EXPRESSION_CACHE_SHARD_KEY: &str = "expression_cache_shard";
78pub const MOCK_AUTHENTICATION_NONCE_KEY: &str = "mock_authentication_nonce";
79
80#[derive(Clone, Debug)]
81pub struct BootstrapArgs {
82    pub cluster_replica_size_map: ClusterReplicaSizeMap,
83    pub default_cluster_replica_size: String,
84    pub default_cluster_replication_factor: u32,
85    pub bootstrap_role: Option<String>,
86}
87
88pub type Epoch = NonZeroI64;
89
90/// An API for opening a durable catalog state.
91///
92/// If a catalog is not opened, then resources should be release via [`Self::expire`].
93#[async_trait]
94pub trait OpenableDurableCatalogState: Debug + Send {
95    // TODO(jkosh44) Teaching savepoint mode how to listen to additional
96    // durable updates will be necessary for zero down time upgrades.
97    /// Opens the catalog in a mode that accepts and buffers all writes,
98    /// but never durably commits them. This is used to check and see if
99    /// opening the catalog would be successful, without making any durable
100    /// changes.
101    ///
102    /// Once a savepoint catalog reads an initial snapshot from durable
103    /// storage, it will never read another update from durable storage. As a
104    /// consequence, savepoint catalogs can never be fenced.
105    ///
106    /// Will return an error in the following scenarios:
107    ///   - Catalog initialization fails.
108    ///   - Catalog migrations fail.
109    ///
110    /// `initial_ts` is used as the initial timestamp for new environments.
111    ///
112    /// Also returns a handle to a thread that is deserializing all of the audit logs.
113    async fn open_savepoint(
114        mut self: Box<Self>,
115        initial_ts: Timestamp,
116        bootstrap_args: &BootstrapArgs,
117    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
118
119    /// Opens the catalog in read only mode. All mutating methods
120    /// will return an error.
121    ///
122    /// If the catalog is uninitialized or requires a migrations, then
123    /// it will fail to open in read only mode.
124    async fn open_read_only(
125        mut self: Box<Self>,
126        bootstrap_args: &BootstrapArgs,
127    ) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
128
129    /// Opens the catalog in a writeable mode. Optionally initializes the
130    /// catalog, if it has not been initialized, and perform any migrations
131    /// needed.
132    ///
133    /// `initial_ts` is used as the initial timestamp for new environments.
134    ///
135    /// Also returns a handle to a thread that is deserializing all of the audit logs.
136    async fn open(
137        mut self: Box<Self>,
138        initial_ts: Timestamp,
139        bootstrap_args: &BootstrapArgs,
140    ) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;
141
142    /// Opens the catalog for manual editing of the underlying data. This is helpful for
143    /// fixing a corrupt catalog.
144    async fn open_debug(mut self: Box<Self>) -> Result<DebugCatalogState, CatalogError>;
145
146    /// Reports if the catalog state has been initialized.
147    async fn is_initialized(&mut self) -> Result<bool, CatalogError>;
148
149    /// Returns the epoch of the current durable catalog state. The epoch acts as
150    /// a fencing token to prevent split brain issues across two
151    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
152    /// catalog, it will increment the epoch by one (or initialize it to some
153    /// value if there's no existing epoch) and store the value in memory. It's
154    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
155    /// for their epoch.
156    ///
157    /// NB: We may remove this in later iterations of Pv2.
158    async fn epoch(&mut self) -> Result<Epoch, CatalogError>;
159
160    /// Get the most recent deployment generation written to the catalog. Not necessarily the
161    /// deploy generation of this instance.
162    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
163
164    /// Get the `with_0dt_deployment_max_wait` config value of this instance.
165    ///
166    /// This mirrors the `with_0dt_deployment_max_wait` "system var" so that we can
167    /// toggle the flag with LaunchDarkly, but use it in boot before
168    /// LaunchDarkly is available.
169    async fn get_0dt_deployment_max_wait(&mut self) -> Result<Option<Duration>, CatalogError>;
170
171    /// Get the `with_0dt_deployment_ddl_check_interval` config value of this instance.
172    ///
173    /// This mirrors the `with_0dt_deployment_ddl_check_interval` "system var" so that we can
174    /// toggle the flag with LaunchDarkly, but use it in boot before
175    /// LaunchDarkly is available.
176    async fn get_0dt_deployment_ddl_check_interval(
177        &mut self,
178    ) -> Result<Option<Duration>, CatalogError>;
179
180    /// Get the `enable_0dt_deployment_panic_after_timeout` config value of this
181    /// instance.
182    ///
183    /// This mirrors the `enable_0dt_deployment_panic_after_timeout` "system var"
184    /// so that we can toggle the flag with LaunchDarkly, but use it in boot
185    /// before LaunchDarkly is available.
186    async fn get_enable_0dt_deployment_panic_after_timeout(
187        &mut self,
188    ) -> Result<Option<bool>, CatalogError>;
189
190    /// Reports if the remote configuration was synchronized at least once.
191    async fn has_system_config_synced_once(&mut self) -> Result<bool, DurableCatalogError>;
192
193    /// Generate an unconsolidated [`Trace`] of catalog contents.
194    async fn trace_unconsolidated(&mut self) -> Result<Trace, CatalogError>;
195
196    /// Generate a consolidated [`Trace`] of catalog contents.
197    async fn trace_consolidated(&mut self) -> Result<Trace, CatalogError>;
198
199    /// Politely releases all external resources that can only be released in an async context.
200    async fn expire(self: Box<Self>);
201}
202
203/// A read only API for the durable catalog state.
204#[async_trait]
205pub trait ReadOnlyDurableCatalogState: Debug + Send + Sync {
206    /// Returns the epoch of the current durable catalog state. The epoch acts as
207    /// a fencing token to prevent split brain issues across two
208    /// [`DurableCatalogState`]s. When a new [`DurableCatalogState`] opens the
209    /// catalog, it will increment the epoch by one (or initialize it to some
210    /// value if there's no existing epoch) and store the value in memory. It's
211    /// guaranteed that no two [`DurableCatalogState`]s will return the same value
212    /// for their epoch.
213    ///
214    /// NB: We may remove this in later iterations of Pv2.
215    fn epoch(&self) -> Epoch;
216
217    /// Returns the metrics for this catalog state.
218    fn metrics(&self) -> &Metrics;
219
220    /// Politely releases all external resources that can only be released in an async context.
221    async fn expire(self: Box<Self>);
222
223    /// Returns true if the system bootstrapping process is complete, false otherwise.
224    fn is_bootstrap_complete(&self) -> bool;
225
226    /// Get all audit log events.
227    ///
228    /// Results are guaranteed to be sorted by ID.
229    ///
230    /// WARNING: This is meant for use in integration tests and has bad performance.
231    async fn get_audit_logs(&mut self) -> Result<Vec<VersionedEvent>, CatalogError>;
232
233    /// Get the next ID of `id_type`, without allocating it.
234    async fn get_next_id(&mut self, id_type: &str) -> Result<u64, CatalogError>;
235
236    /// Get the next user ID without allocating it.
237    async fn get_next_user_item_id(&mut self) -> Result<u64, CatalogError> {
238        self.get_next_id(USER_ITEM_ALLOC_KEY).await
239    }
240
241    /// Get the next system ID without allocating it.
242    async fn get_next_system_item_id(&mut self) -> Result<u64, CatalogError> {
243        self.get_next_id(SYSTEM_ITEM_ALLOC_KEY).await
244    }
245
246    /// Get the next system replica id without allocating it.
247    async fn get_next_system_replica_id(&mut self) -> Result<u64, CatalogError> {
248        self.get_next_id(SYSTEM_REPLICA_ID_ALLOC_KEY).await
249    }
250
251    /// Get the next user replica id without allocating it.
252    async fn get_next_user_replica_id(&mut self) -> Result<u64, CatalogError> {
253        self.get_next_id(USER_REPLICA_ID_ALLOC_KEY).await
254    }
255
256    /// Get the deployment generation of this instance.
257    async fn get_deployment_generation(&mut self) -> Result<u64, CatalogError>;
258
259    /// Get a snapshot of the catalog.
260    async fn snapshot(&mut self) -> Result<Snapshot, CatalogError>;
261
262    /// Listen and return all updates that are currently in the catalog.
263    ///
264    /// IMPORTANT: This excludes updates to storage usage.
265    ///
266    /// Returns an error if this instance has been fenced out.
267    async fn sync_to_current_updates(
268        &mut self,
269    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
270
271    // TODO(jkosh44) The fact that the timestamp argument is an exclusive upper bound makes
272    // it difficult to use for readers. For now it's correct and easy to implement, but we should
273    // consider a better API.
274    /// Listen and return all updates in the catalog up to `target_upper`.
275    ///
276    /// IMPORTANT: This excludes updates to storage usage.
277    ///
278    /// Returns an error if this instance has been fenced out.
279    async fn sync_updates(
280        &mut self,
281        target_upper: Timestamp,
282    ) -> Result<Vec<memory::objects::StateUpdate>, CatalogError>;
283
284    /// Fetch the current upper of the catalog state.
285    async fn current_upper(&mut self) -> Timestamp;
286}
287
288/// A read-write API for the durable catalog state.
289#[async_trait]
290#[allow(mismatched_lifetime_syntaxes)]
291pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
292    /// Returns true if the catalog is opened in read only mode, false otherwise.
293    fn is_read_only(&self) -> bool;
294
295    /// Returns true if the catalog is opened is savepoint mode, false otherwise.
296    fn is_savepoint(&self) -> bool;
297
298    /// Marks the bootstrap process as complete.
299    async fn mark_bootstrap_complete(&mut self);
300
301    /// Creates a new durable catalog state transaction.
302    async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
303
304    /// Creates a new transaction initialized from the given [`Snapshot`]
305    /// instead of reading from durable storage. Used for incremental DDL
306    /// dry runs where the transaction state from a previous dry run has been
307    /// saved and needs to be restored so it stays in sync with the accumulated
308    /// `CatalogState`.
309    fn transaction_from_snapshot(
310        &mut self,
311        snapshot: Snapshot,
312    ) -> Result<Transaction, CatalogError>;
313
314    /// Commits a durable catalog state transaction. The transaction will be committed at
315    /// `commit_ts`.
316    ///
317    /// Returns what the upper was directly after the transaction committed.
318    ///
319    /// Panics if `commit_ts` is not greater than or equal to the most recent upper seen by this
320    /// process.
321    async fn commit_transaction(
322        &mut self,
323        txn_batch: TransactionBatch,
324        commit_ts: Timestamp,
325    ) -> Result<Timestamp, CatalogError>;
326
327    /// Advances the upper of the catalog shard to `new_upper`.
328    ///
329    /// This implicitly confirms leadership, as attempting to advance the catalog frontier will
330    /// fail if the writer has been fenced out.
331    async fn advance_upper(&mut self, new_upper: Timestamp) -> Result<(), CatalogError>;
332
333    /// Allocates and returns `amount` IDs of `id_type`.
334    ///
335    /// See [`Self::commit_transaction`] for details on `commit_ts`.
336    #[mz_ore::instrument(level = "debug")]
337    async fn allocate_id(
338        &mut self,
339        id_type: &str,
340        amount: u64,
341        commit_ts: Timestamp,
342    ) -> Result<Vec<u64>, CatalogError> {
343        let start = Instant::now();
344        if amount == 0 {
345            return Ok(Vec::new());
346        }
347        let mut txn = self.transaction().await?;
348        let ids = txn.get_and_increment_id_by(id_type.to_string(), amount)?;
349        txn.commit_internal(commit_ts).await?;
350        self.metrics()
351            .allocate_id_seconds
352            .observe(start.elapsed().as_secs_f64());
353        Ok(ids)
354    }
355
356    /// Allocates and returns `amount` many user [`CatalogItemId`] and [`GlobalId`].
357    ///
358    /// See [`Self::commit_transaction`] for details on `commit_ts`.
359    async fn allocate_user_ids(
360        &mut self,
361        amount: u64,
362        commit_ts: Timestamp,
363    ) -> Result<Vec<(CatalogItemId, GlobalId)>, CatalogError> {
364        let ids = self
365            .allocate_id(USER_ITEM_ALLOC_KEY, amount, commit_ts)
366            .await?;
367        let ids = ids
368            .iter()
369            .map(|id| (CatalogItemId::User(*id), GlobalId::User(*id)))
370            .collect();
371        Ok(ids)
372    }
373
374    /// Allocates and returns both a user [`CatalogItemId`] and [`GlobalId`].
375    ///
376    /// See [`Self::commit_transaction`] for details on `commit_ts`.
377    async fn allocate_user_id(
378        &mut self,
379        commit_ts: Timestamp,
380    ) -> Result<(CatalogItemId, GlobalId), CatalogError> {
381        let id = self.allocate_id(USER_ITEM_ALLOC_KEY, 1, commit_ts).await?;
382        let id = id.into_element();
383        Ok((CatalogItemId::User(id), GlobalId::User(id)))
384    }
385
386    /// Allocates and returns a user [`ClusterId`].
387    ///
388    /// See [`Self::commit_transaction`] for details on `commit_ts`.
389    async fn allocate_user_cluster_id(
390        &mut self,
391        commit_ts: Timestamp,
392    ) -> Result<ClusterId, CatalogError> {
393        let id = self
394            .allocate_id(USER_CLUSTER_ID_ALLOC_KEY, 1, commit_ts)
395            .await?
396            .into_element();
397        Ok(ClusterId::user(id).ok_or(SqlCatalogError::IdExhaustion)?)
398    }
399
400    /// Allocates and returns `amount` many user [`ReplicaId`]s.
401    ///
402    /// See [`Self::commit_transaction`] for details on `commit_ts`.
403    async fn allocate_user_replica_ids(
404        &mut self,
405        amount: u64,
406        commit_ts: Timestamp,
407    ) -> Result<Vec<ReplicaId>, CatalogError> {
408        let ids = self
409            .allocate_id(USER_REPLICA_ID_ALLOC_KEY, amount, commit_ts)
410            .await?;
411        let ids = ids.into_iter().map(ReplicaId::User).collect();
412        Ok(ids)
413    }
414
415    /// Allocates and returns `amount` many system [`ReplicaId`]s.
416    ///
417    /// See [`Self::commit_transaction`] for details on `commit_ts`.
418    async fn allocate_system_replica_ids(
419        &mut self,
420        amount: u64,
421        commit_ts: Timestamp,
422    ) -> Result<Vec<ReplicaId>, CatalogError> {
423        let ids = self
424            .allocate_id(SYSTEM_REPLICA_ID_ALLOC_KEY, amount, commit_ts)
425            .await?;
426        let ids = ids.into_iter().map(ReplicaId::System).collect();
427        Ok(ids)
428    }
429
430    fn shard_id(&self) -> ShardId;
431}
432
433trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
434impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}
435
436/// An iterator that returns audit log events in reverse ID order.
437#[derive(Debug)]
438pub struct AuditLogIterator {
439    // We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
440    // first call to `next`, instead of sorting the contents on initialization.
441    audit_logs: Box<dyn AuditLogIteratorTrait>,
442}
443
444impl AuditLogIterator {
445    fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
446        let audit_logs = audit_logs
447            .into_iter()
448            .map(|(kind, ts, diff)| {
449                assert_eq!(
450                    diff,
451                    Diff::ONE,
452                    "audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
453                );
454                assert!(
455                    kind.is_audit_log(),
456                    "unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
457                );
458                let id = kind.audit_log_id();
459                (kind, ts, id)
460            })
461            .sorted_by_key(|(_, ts, id)| (*ts, *id))
462            .map(|(kind, ts, _id)| (kind, ts))
463            .rev()
464            .map(|(kind, ts)| {
465                // Each event will be deserialized lazily on a call to `next`.
466                let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
467                let kind: Option<memory::objects::StateUpdateKind> = (&kind)
468                    .try_into()
469                    .expect("invalid persisted update: {update:#?}");
470                let kind = kind.expect("audit log always produces im-memory updates");
471                let audit_log = match kind {
472                    memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
473                    kind => unreachable!("invalid kind: {kind:?}"),
474                };
475                (audit_log, ts)
476            });
477        Self {
478            audit_logs: Box::new(audit_logs),
479        }
480    }
481}
482
483impl Iterator for AuditLogIterator {
484    type Item = (AuditLog, Timestamp);
485
486    fn next(&mut self) -> Option<Self::Item> {
487        self.audit_logs.next()
488    }
489}
490
491/// Returns the schema of the `Row`s/`SourceData`s stored in the persist
492/// shard backing the catalog.
493pub fn persist_desc() -> RelationDesc {
494    RelationDesc::builder()
495        .with_column("data", SqlScalarType::Jsonb.nullable(false))
496        .finish()
497}
498
499/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
500#[derive(Debug, Clone)]
501pub struct TestCatalogStateBuilder {
502    persist_client: PersistClient,
503    organization_id: Uuid,
504    version: semver::Version,
505    deploy_generation: Option<u64>,
506    metrics: Arc<Metrics>,
507}
508
509impl TestCatalogStateBuilder {
510    pub fn new(persist_client: PersistClient) -> Self {
511        Self {
512            persist_client,
513            organization_id: Uuid::new_v4(),
514            version: semver::Version::new(0, 0, 0),
515            deploy_generation: None,
516            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
517        }
518    }
519
520    pub fn with_organization_id(mut self, organization_id: Uuid) -> Self {
521        self.organization_id = organization_id;
522        self
523    }
524
525    pub fn with_version(mut self, version: semver::Version) -> Self {
526        self.version = version;
527        self
528    }
529
530    pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
531        self.deploy_generation = Some(deploy_generation);
532        self
533    }
534
535    pub fn with_default_deploy_generation(self) -> Self {
536        self.with_deploy_generation(0)
537    }
538
539    pub fn with_metrics(mut self, metrics: Arc<Metrics>) -> Self {
540        self.metrics = metrics;
541        self
542    }
543
544    pub async fn build(self) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
545        persist_backed_catalog_state(
546            self.persist_client,
547            self.organization_id,
548            self.version,
549            self.deploy_generation,
550            self.metrics,
551        )
552        .await
553    }
554
555    pub async fn unwrap_build(self) -> Box<dyn OpenableDurableCatalogState> {
556        self.expect_build("failed to build").await
557    }
558
559    pub async fn expect_build(self, msg: &str) -> Box<dyn OpenableDurableCatalogState> {
560        self.build().await.expect(msg)
561    }
562}
563
564/// Creates an openable durable catalog state implemented using persist.
565///
566/// `deploy_generation` MUST be `Some` to initialize a new catalog.
567pub async fn persist_backed_catalog_state(
568    persist_client: PersistClient,
569    organization_id: Uuid,
570    version: semver::Version,
571    deploy_generation: Option<u64>,
572    metrics: Arc<Metrics>,
573) -> Result<Box<dyn OpenableDurableCatalogState>, DurableCatalogError> {
574    let state = UnopenedPersistCatalogState::new(
575        persist_client,
576        organization_id,
577        version,
578        deploy_generation,
579        metrics,
580    )
581    .await?;
582    Ok(Box::new(state))
583}
584
585pub fn test_bootstrap_args() -> BootstrapArgs {
586    BootstrapArgs {
587        default_cluster_replica_size: "scale=1,workers=1".into(),
588        default_cluster_replication_factor: 1,
589        bootstrap_role: None,
590        cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
591    }
592}