Skip to main content

mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
22    ReplicaLogging,
23};
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL};
25use mz_ore::cast::CastFrom;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46    Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53    type Ctx = ExecuteContext;
54
55    fn validity(&mut self) -> &mut PlanValidity {
56        match self {
57            Self::Alter(stage) => &mut stage.validity,
58            Self::WaitForHydrated(stage) => &mut stage.validity,
59            Self::Finalize(stage) => &mut stage.validity,
60        }
61    }
62
63    async fn stage(
64        self,
65        coord: &mut Coordinator,
66        ctx: &mut ExecuteContext,
67    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68        match self {
69            Self::Alter(stage) => {
70                coord
71                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72                    .await
73            }
74            Self::WaitForHydrated(stage) => {
75                let AlterClusterWaitForHydrated {
76                    validity,
77                    plan,
78                    new_config,
79                    timeout_time,
80                    on_timeout,
81                } = stage;
82                coord
83                    .check_if_pending_replicas_hydrated_stage(
84                        ctx.session(),
85                        plan,
86                        new_config,
87                        timeout_time,
88                        on_timeout,
89                        validity,
90                    )
91                    .await
92            }
93            Self::Finalize(stage) => {
94                coord
95                    .finalize_alter_cluster_stage(
96                        ctx.session(),
97                        stage.plan.clone(),
98                        stage.new_config.clone(),
99                    )
100                    .await
101            }
102        }
103    }
104
105    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106        Message::ClusterStageReady {
107            ctx,
108            span,
109            stage: self,
110        }
111    }
112
113    fn cancel_enabled(&self) -> bool {
114        true
115    }
116}
117
118impl Coordinator {
119    #[instrument]
120    pub(crate) async fn sequence_alter_cluster_staged(
121        &mut self,
122        ctx: ExecuteContext,
123        plan: plan::AlterClusterPlan,
124    ) {
125        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126        self.sequence_staged(ctx, Span::current(), stage).await;
127    }
128
129    #[instrument]
130    async fn alter_cluster_validate(
131        &self,
132        session: &Session,
133        plan: plan::AlterClusterPlan,
134    ) -> Result<ClusterStage, AdapterError> {
135        let validity = PlanValidity::new(
136            self.catalog().transient_revision(),
137            BTreeSet::new(),
138            Some(plan.id.clone()),
139            None,
140            session.role_metadata().clone(),
141        );
142        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143    }
144
145    async fn sequence_alter_cluster_stage(
146        &mut self,
147        session: &Session,
148        plan: plan::AlterClusterPlan,
149        validity: PlanValidity,
150    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151        let AlterClusterPlan {
152            id: cluster_id,
153            name: _,
154            ref options,
155            ref strategy,
156        } = plan;
157
158        use mz_catalog::memory::objects::ClusterVariant::*;
159        use mz_sql::plan::AlterOptionParameter::*;
160        let cluster = self.catalog.get_cluster(cluster_id);
161        let config = cluster.config.clone();
162        let mut new_config = config.clone();
163
164        match (&new_config.variant, &options.managed) {
165            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169                // Generate a minimal correct configuration
170
171                // Size adjusted later when sequencing the actual configuration change.
172                let size = "".to_string();
173                let logging = ReplicaLogging {
174                    log_logging: false,
175                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
176                };
177                new_config.variant = Managed(ClusterVariantManaged {
178                    size,
179                    availability_zones: Default::default(),
180                    logging,
181                    replication_factor: 1,
182                    optimizer_feature_overrides: Default::default(),
183                    schedule: Default::default(),
184                });
185            }
186        }
187
188        match &mut new_config.variant {
189            Managed(ClusterVariantManaged {
190                size,
191                availability_zones,
192                logging,
193                replication_factor,
194                optimizer_feature_overrides: _,
195                schedule,
196            }) => {
197                match &options.size {
198                    Set(s) => size.clone_from(s),
199                    Reset => coord_bail!("SIZE has no default value"),
200                    Unchanged => {}
201                }
202                match &options.availability_zones {
203                    Set(az) => availability_zones.clone_from(az),
204                    Reset => *availability_zones = Default::default(),
205                    Unchanged => {}
206                }
207                match &options.introspection_debugging {
208                    Set(id) => logging.log_logging = *id,
209                    Reset => logging.log_logging = false,
210                    Unchanged => {}
211                }
212                match &options.introspection_interval {
213                    Set(ii) => logging.interval = ii.0,
214                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
215                    Unchanged => {}
216                }
217                match &options.replication_factor {
218                    Set(rf) => *replication_factor = *rf,
219                    Reset => {
220                        *replication_factor = self
221                            .catalog
222                            .system_config()
223                            .default_cluster_replication_factor()
224                    }
225                    Unchanged => {}
226                }
227                match &options.schedule {
228                    Set(new_schedule) => {
229                        *schedule = new_schedule.clone();
230                    }
231                    Reset => *schedule = Default::default(),
232                    Unchanged => {}
233                }
234                if !matches!(options.replicas, Unchanged) {
235                    coord_bail!("Cannot change REPLICAS of managed clusters");
236                }
237            }
238            Unmanaged => {
239                if !matches!(options.size, Unchanged) {
240                    coord_bail!("Cannot change SIZE of unmanaged clusters");
241                }
242                if !matches!(options.availability_zones, Unchanged) {
243                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
244                }
245                if !matches!(options.introspection_debugging, Unchanged) {
246                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
247                }
248                if !matches!(options.introspection_interval, Unchanged) {
249                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
250                }
251                if !matches!(options.replication_factor, Unchanged) {
252                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
253                }
254            }
255        }
256
257        match &options.workload_class {
258            Set(wc) => new_config.workload_class.clone_from(wc),
259            Reset => new_config.workload_class = None,
260            Unchanged => {}
261        }
262
263        if new_config == config {
264            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
265                ObjectType::Cluster,
266            )));
267        }
268
269        match (&config.variant, &new_config.variant) {
270            (Managed(_), Managed(new_config_managed)) => {
271                let alter_followup = self
272                    .sequence_alter_cluster_managed_to_managed(
273                        Some(session),
274                        cluster_id,
275                        new_config.clone(),
276                        ReplicaCreateDropReason::Manual,
277                        strategy.clone(),
278                    )
279                    .await?;
280                if alter_followup == NeedsFinalization::Yes {
281                    // For non backgrounded zero-downtime alters, store the
282                    // cluster_id in the ConnMeta to allow for cancellation.
283                    self.active_conns
284                        .get_mut(session.conn_id())
285                        .expect("There must be an active connection")
286                        .pending_cluster_alters
287                        .insert(cluster_id.clone());
288                    let new_config_managed = new_config_managed.clone();
289                    return match &strategy {
290                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
291                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
292                                .into(),
293                        )),
294                        AlterClusterPlanStrategy::For(duration) => {
295                            let span = Span::current();
296                            let plan = plan.clone();
297                            let duration = duration.clone().to_owned();
298                            Ok(StageResult::Handle(mz_ore::task::spawn(
299                                || "Finalize Alter Cluster",
300                                async move {
301                                    tokio::time::sleep(duration).await;
302                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
303                                        validity,
304                                        plan,
305                                        new_config: new_config_managed,
306                                    });
307                                    Ok(Box::new(stage))
308                                }
309                                .instrument(span),
310                            )))
311                        }
312                        AlterClusterPlanStrategy::UntilReady {
313                            timeout,
314                            on_timeout,
315                        } => Ok(StageResult::Immediate(Box::new(
316                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
317                                validity,
318                                plan: plan.clone(),
319                                new_config: new_config_managed.clone(),
320                                timeout_time: Instant::now() + timeout.to_owned(),
321                                on_timeout: on_timeout.to_owned(),
322                            }),
323                        ))),
324                    };
325                }
326            }
327            (Unmanaged, Managed(_)) => {
328                self.sequence_alter_cluster_unmanaged_to_managed(
329                    session,
330                    cluster_id,
331                    new_config,
332                    options.to_owned(),
333                )
334                .await?;
335            }
336            (Managed(_), Unmanaged) => {
337                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
338                    .await?;
339            }
340            (Unmanaged, Unmanaged) => {
341                self.sequence_alter_cluster_unmanaged_to_unmanaged(
342                    session,
343                    cluster_id,
344                    new_config,
345                    options.replicas.clone(),
346                )
347                .await?;
348            }
349        }
350
351        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
352            ObjectType::Cluster,
353        )))
354    }
355
356    async fn finalize_alter_cluster_stage(
357        &mut self,
358        session: &Session,
359        AlterClusterPlan {
360            id: cluster_id,
361            name: cluster_name,
362            ..
363        }: AlterClusterPlan,
364        new_config: ClusterVariantManaged,
365    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
366        let cluster = self.catalog.get_cluster(cluster_id);
367        let workload_class = cluster.config.workload_class.clone();
368        let mut ops = vec![];
369
370        // Gather the ops to remove the non pending replicas
371        // Also skip any billed_as free replicas
372        let remove_replicas = cluster
373            .replicas()
374            .filter_map(|r| {
375                if !r.config.location.pending() && !r.config.location.internal() {
376                    Some(catalog::DropObjectInfo::ClusterReplica((
377                        cluster_id.clone(),
378                        r.replica_id,
379                        ReplicaCreateDropReason::Manual,
380                    )))
381                } else {
382                    None
383                }
384            })
385            .collect();
386        ops.push(catalog::Op::DropObjects(remove_replicas));
387
388        // Gather the Ops to remove the "-pending" suffix from the name and set
389        // pending to false
390        let finalize_replicas: Vec<catalog::Op> = cluster
391            .replicas()
392            .filter_map(|r| {
393                if r.config.location.pending() {
394                    let cluster_ident = match Ident::new(cluster.name.clone()) {
395                        Ok(id) => id,
396                        Err(err) => {
397                            return Some(Err(AdapterError::internal(
398                                "Unexpected error parsing cluster name",
399                                err,
400                            )));
401                        }
402                    };
403                    let replica_ident = match Ident::new(r.name.clone()) {
404                        Ok(id) => id,
405                        Err(err) => {
406                            return Some(Err(AdapterError::internal(
407                                "Unexpected error parsing replica name",
408                                err,
409                            )));
410                        }
411                    };
412                    Some(Ok((cluster_ident, replica_ident, r)))
413                } else {
414                    None
415                }
416            })
417            // Early collection is to handle errors from generating of the
418            // Idents
419            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
420            .into_iter()
421            .map(|(cluster_ident, replica_ident, replica)| {
422                let mut new_replica_config = replica.config.clone();
423                debug!("Promoting replica: {}", replica.name);
424                match new_replica_config.location {
425                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
426                        ref mut pending,
427                        ..
428                    }) => {
429                        *pending = false;
430                    }
431                    mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
432                }
433
434                let mut replica_ops = vec![];
435                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
436                if let Some(to_name) = to_name {
437                    replica_ops.push(catalog::Op::RenameClusterReplica {
438                        cluster_id: cluster_id.clone(),
439                        replica_id: replica.replica_id.to_owned(),
440                        name: QualifiedReplica {
441                            cluster: cluster_ident,
442                            replica: replica_ident,
443                        },
444                        to_name: to_name.to_owned(),
445                    });
446                }
447                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
448                    cluster_id,
449                    replica_id: replica.replica_id.to_owned(),
450                    config: new_replica_config,
451                });
452                replica_ops
453            })
454            .flatten()
455            .collect();
456
457        ops.extend(finalize_replicas);
458
459        // Add the Op to update the cluster state
460        ops.push(Op::UpdateClusterConfig {
461            id: cluster_id,
462            name: cluster_name,
463            config: ClusterConfig {
464                variant: ClusterVariant::Managed(new_config),
465                workload_class: workload_class.clone(),
466            },
467        });
468        self.catalog_transact(Some(session), ops).await?;
469        // Remove the cluster being altered from the ConnMeta
470        // pending_cluster_alters BTreeSet
471        self.active_conns
472            .get_mut(session.conn_id())
473            .expect("There must be an active connection")
474            .pending_cluster_alters
475            .remove(&cluster_id);
476
477        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
478            ObjectType::Cluster,
479        )))
480    }
481
482    async fn check_if_pending_replicas_hydrated_stage(
483        &mut self,
484        session: &Session,
485        plan: AlterClusterPlan,
486        new_config: ClusterVariantManaged,
487        timeout_time: Instant,
488        on_timeout: OnTimeoutAction,
489        validity: PlanValidity,
490    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
491        // wait and re-signal wait for hydrated if not hydrated
492        let cluster = self.catalog.get_cluster(plan.id);
493        let pending_replicas = cluster
494            .replicas()
495            .filter_map(|r| {
496                if r.config.location.pending() {
497                    Some(r.replica_id.clone())
498                } else {
499                    None
500                }
501            })
502            .collect_vec();
503        // Check For timeout
504        if Instant::now() > timeout_time {
505            // Timed out handle timeout action
506            match on_timeout {
507                OnTimeoutAction::Rollback => {
508                    self.active_conns
509                        .get_mut(session.conn_id())
510                        .expect("There must be an active connection")
511                        .pending_cluster_alters
512                        .remove(&cluster.id);
513                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
514                        .await?;
515                    return Err(AdapterError::AlterClusterTimeout);
516                }
517                OnTimeoutAction::Commit => {
518                    let span = Span::current();
519                    let poll_duration = self
520                        .catalog
521                        .system_config()
522                        .cluster_alter_check_ready_interval()
523                        .clone();
524                    return Ok(StageResult::Handle(mz_ore::task::spawn(
525                        || "Finalize Alter Cluster",
526                        async move {
527                            tokio::time::sleep(poll_duration).await;
528                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
529                                validity,
530                                plan,
531                                new_config,
532                            });
533                            Ok(Box::new(stage))
534                        }
535                        .instrument(span),
536                    )));
537                }
538            }
539        }
540        let compute_hydrated_fut = self
541            .controller
542            .compute
543            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
544            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
545
546        let storage_hydrated = self
547            .controller
548            .storage
549            .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
550            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
551
552        let span = Span::current();
553        Ok(StageResult::Handle(mz_ore::task::spawn(
554            || "Alter Cluster: wait for hydrated",
555            async move {
556                let compute_hydrated = compute_hydrated_fut
557                    .await
558                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560                if compute_hydrated && storage_hydrated {
561                    // We're done
562                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
563                        validity,
564                        plan,
565                        new_config,
566                    })))
567                } else {
568                    // Check later
569                    tokio::time::sleep(Duration::from_secs(1)).await;
570                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
571                        validity,
572                        plan,
573                        new_config,
574                        timeout_time,
575                        on_timeout,
576                    });
577                    Ok(Box::new(stage))
578                }
579            }
580            .instrument(span),
581        )))
582    }
583
584    #[mz_ore::instrument(level = "debug")]
585    pub(crate) async fn sequence_create_cluster(
586        &mut self,
587        session: &Session,
588        CreateClusterPlan {
589            name,
590            variant,
591            workload_class,
592        }: CreateClusterPlan,
593    ) -> Result<ExecuteResponse, AdapterError> {
594        tracing::debug!("sequence_create_cluster");
595
596        let id_ts = self.get_catalog_write_ts().await;
597        let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
598        // The catalog items for the introspection sources are shared between all replicas
599        // of a compute instance, so we create them unconditionally during instance creation.
600        // Whether a replica actually maintains introspection arrangements is determined by the
601        // per-replica introspection configuration.
602        let introspection_sources = BUILTINS::logs().collect();
603        let cluster_variant = match &variant {
604            CreateClusterVariant::Managed(plan) => {
605                let logging = if let Some(config) = plan.compute.introspection {
606                    ReplicaLogging {
607                        log_logging: config.debugging,
608                        interval: Some(config.interval),
609                    }
610                } else {
611                    ReplicaLogging::default()
612                };
613                ClusterVariant::Managed(ClusterVariantManaged {
614                    size: plan.size.clone(),
615                    availability_zones: plan.availability_zones.clone(),
616                    logging,
617                    replication_factor: plan.replication_factor,
618                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
619                    schedule: plan.schedule.clone(),
620                })
621            }
622            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
623        };
624        let config = ClusterConfig {
625            variant: cluster_variant,
626            workload_class,
627        };
628        let ops = vec![catalog::Op::CreateCluster {
629            id,
630            name: name.clone(),
631            introspection_sources,
632            owner_id: *session.current_role_id(),
633            config,
634        }];
635
636        match variant {
637            CreateClusterVariant::Managed(plan) => {
638                self.sequence_create_managed_cluster(session, plan, id, ops)
639                    .await
640            }
641            CreateClusterVariant::Unmanaged(plan) => {
642                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
643                    .await
644            }
645        }
646    }
647
648    #[mz_ore::instrument(level = "debug")]
649    async fn sequence_create_managed_cluster(
650        &mut self,
651        session: &Session,
652        CreateClusterManagedPlan {
653            availability_zones,
654            compute,
655            replication_factor,
656            size,
657            optimizer_feature_overrides: _,
658            schedule: _,
659        }: CreateClusterManagedPlan,
660        cluster_id: ClusterId,
661        mut ops: Vec<catalog::Op>,
662    ) -> Result<ExecuteResponse, AdapterError> {
663        tracing::debug!("sequence_create_managed_cluster");
664
665        self.ensure_valid_azs(availability_zones.iter())?;
666
667        let role_id = session.role_metadata().current_role;
668        self.catalog.ensure_valid_replica_size(
669            &self
670                .catalog()
671                .get_role_allowed_cluster_sizes(&Some(role_id)),
672            &size,
673        )?;
674
675        // Eagerly validate the `max_replicas_per_cluster` limit.
676        // `catalog_transact` will do this validation too, but allocating
677        // replica IDs is expensive enough that we need to do this validation
678        // before allocating replica IDs. See database-issues#6046.
679        if cluster_id.is_user() {
680            self.validate_resource_limit(
681                0,
682                i64::from(replication_factor),
683                SystemVars::max_replicas_per_cluster,
684                "cluster replica",
685                MAX_REPLICAS_PER_CLUSTER.name(),
686            )?;
687        }
688
689        for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
690            self.create_managed_cluster_replica_op(
691                cluster_id,
692                replica_name,
693                &compute,
694                &size,
695                &mut ops,
696                if availability_zones.is_empty() {
697                    None
698                } else {
699                    Some(availability_zones.as_ref())
700                },
701                false,
702                *session.current_role_id(),
703                ReplicaCreateDropReason::Manual,
704            )?;
705        }
706
707        self.catalog_transact(Some(session), ops).await?;
708
709        Ok(ExecuteResponse::CreatedCluster)
710    }
711
712    fn create_managed_cluster_replica_op(
713        &self,
714        cluster_id: ClusterId,
715        name: String,
716        compute: &mz_sql::plan::ComputeReplicaConfig,
717        size: &String,
718        ops: &mut Vec<Op>,
719        azs: Option<&[String]>,
720        pending: bool,
721        owner_id: RoleId,
722        reason: ReplicaCreateDropReason,
723    ) -> Result<(), AdapterError> {
724        let location = mz_catalog::durable::ReplicaLocation::Managed {
725            availability_zone: None,
726            billed_as: None,
727            internal: false,
728            size: size.clone(),
729            pending,
730        };
731
732        let logging = if let Some(config) = compute.introspection {
733            ReplicaLogging {
734                log_logging: config.debugging,
735                interval: Some(config.interval),
736            }
737        } else {
738            ReplicaLogging::default()
739        };
740
741        let config = ReplicaConfig {
742            location: self.catalog().concretize_replica_location(
743                location,
744                &self
745                    .catalog()
746                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
747                azs,
748            )?,
749            compute: ComputeReplicaConfig { logging },
750        };
751
752        ops.push(catalog::Op::CreateClusterReplica {
753            cluster_id,
754            name,
755            config,
756            owner_id,
757            reason,
758        });
759        Ok(())
760    }
761
762    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
763        &self,
764        azs: I,
765    ) -> Result<(), AdapterError> {
766        let cat_azs = self.catalog().state().availability_zones();
767        for az in azs.into_iter() {
768            if !cat_azs.contains(az) {
769                return Err(AdapterError::InvalidClusterReplicaAz {
770                    az: az.to_string(),
771                    expected: cat_azs.to_vec(),
772                });
773            }
774        }
775        Ok(())
776    }
777
778    #[mz_ore::instrument(level = "debug")]
779    async fn sequence_create_unmanaged_cluster(
780        &mut self,
781        session: &Session,
782        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
783        id: ClusterId,
784        mut ops: Vec<catalog::Op>,
785    ) -> Result<ExecuteResponse, AdapterError> {
786        tracing::debug!("sequence_create_unmanaged_cluster");
787
788        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
789            if let mz_sql::plan::ReplicaConfig::Orchestrated {
790                availability_zone: Some(az),
791                ..
792            } = &r
793            {
794                Some(az)
795            } else {
796                None
797            }
798        }))?;
799
800        // Eagerly validate the `max_replicas_per_cluster` limit.
801        // `catalog_transact` will do this validation too, but allocating
802        // replica IDs is expensive enough that we need to do this validation
803        // before allocating replica IDs. See database-issues#6046.
804        if id.is_user() {
805            self.validate_resource_limit(
806                0,
807                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
808                SystemVars::max_replicas_per_cluster,
809                "cluster replica",
810                MAX_REPLICAS_PER_CLUSTER.name(),
811            )?;
812        }
813
814        for (replica_name, replica_config) in replicas {
815            // If the AZ was not specified, choose one, round-robin, from the ones with
816            // the lowest number of configured replicas for this cluster.
817            let (compute, location) = match replica_config {
818                mz_sql::plan::ReplicaConfig::Unorchestrated {
819                    storagectl_addrs,
820                    computectl_addrs,
821                    compute,
822                } => {
823                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
824                        storagectl_addrs,
825                        computectl_addrs,
826                    };
827                    (compute, location)
828                }
829                mz_sql::plan::ReplicaConfig::Orchestrated {
830                    availability_zone,
831                    billed_as,
832                    compute,
833                    internal,
834                    size,
835                } => {
836                    // Only internal users have access to INTERNAL and BILLED AS
837                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
838                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
839                    }
840                    // BILLED AS implies the INTERNAL flag.
841                    if billed_as.is_some() && !internal {
842                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
843                    }
844
845                    let location = mz_catalog::durable::ReplicaLocation::Managed {
846                        availability_zone,
847                        billed_as,
848                        internal,
849                        size: size.clone(),
850                        pending: false,
851                    };
852                    (compute, location)
853                }
854            };
855
856            let logging = if let Some(config) = compute.introspection {
857                ReplicaLogging {
858                    log_logging: config.debugging,
859                    interval: Some(config.interval),
860                }
861            } else {
862                ReplicaLogging::default()
863            };
864
865            let role_id = session.role_metadata().current_role;
866            let config = ReplicaConfig {
867                location: self.catalog().concretize_replica_location(
868                    location,
869                    &self
870                        .catalog()
871                        .get_role_allowed_cluster_sizes(&Some(role_id)),
872                    None,
873                )?,
874                compute: ComputeReplicaConfig { logging },
875            };
876
877            ops.push(catalog::Op::CreateClusterReplica {
878                cluster_id: id,
879                name: replica_name.clone(),
880                config,
881                owner_id: *session.current_role_id(),
882                reason: ReplicaCreateDropReason::Manual,
883            });
884        }
885
886        self.catalog_transact(Some(session), ops).await?;
887
888        Ok(ExecuteResponse::CreatedCluster)
889    }
890
891    #[mz_ore::instrument(level = "debug")]
892    pub(crate) async fn sequence_create_cluster_replica(
893        &mut self,
894        session: &Session,
895        CreateClusterReplicaPlan {
896            name,
897            cluster_id,
898            config,
899        }: CreateClusterReplicaPlan,
900    ) -> Result<ExecuteResponse, AdapterError> {
901        // Choose default AZ if necessary
902        let (compute, location) = match config {
903            mz_sql::plan::ReplicaConfig::Unorchestrated {
904                storagectl_addrs,
905                computectl_addrs,
906                compute,
907            } => {
908                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
909                    storagectl_addrs,
910                    computectl_addrs,
911                };
912                (compute, location)
913            }
914            mz_sql::plan::ReplicaConfig::Orchestrated {
915                availability_zone,
916                billed_as,
917                compute,
918                internal,
919                size,
920            } => {
921                let availability_zone = match availability_zone {
922                    Some(az) => {
923                        self.ensure_valid_azs([&az])?;
924                        Some(az)
925                    }
926                    None => None,
927                };
928                let location = mz_catalog::durable::ReplicaLocation::Managed {
929                    availability_zone,
930                    billed_as,
931                    internal,
932                    size,
933                    pending: false,
934                };
935                (compute, location)
936            }
937        };
938
939        let logging = if let Some(config) = compute.introspection {
940            ReplicaLogging {
941                log_logging: config.debugging,
942                interval: Some(config.interval),
943            }
944        } else {
945            ReplicaLogging::default()
946        };
947
948        let role_id = session.role_metadata().current_role;
949        let config = ReplicaConfig {
950            location: self.catalog().concretize_replica_location(
951                location,
952                &self
953                    .catalog()
954                    .get_role_allowed_cluster_sizes(&Some(role_id)),
955                // Planning ensures all replicas in this codepath
956                // are unmanaged.
957                None,
958            )?,
959            compute: ComputeReplicaConfig { logging },
960        };
961
962        let cluster = self.catalog().get_cluster(cluster_id);
963
964        if let ReplicaLocation::Managed(ManagedReplicaLocation {
965            internal,
966            billed_as,
967            ..
968        }) = &config.location
969        {
970            // Only internal users have access to INTERNAL and BILLED AS
971            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
972                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
973            }
974            // Managed clusters require the INTERNAL flag.
975            if cluster.is_managed() && !*internal {
976                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
977            }
978            // BILLED AS implies the INTERNAL flag.
979            if billed_as.is_some() && !*internal {
980                coord_bail!("must specify INTERNAL when specifying BILLED AS");
981            }
982        }
983
984        // Replicas have the same owner as their cluster.
985        let owner_id = cluster.owner_id();
986        let op = catalog::Op::CreateClusterReplica {
987            cluster_id,
988            name: name.clone(),
989            config,
990            owner_id,
991            reason: ReplicaCreateDropReason::Manual,
992        };
993
994        self.catalog_transact(Some(session), vec![op]).await?;
995
996        Ok(ExecuteResponse::CreatedClusterReplica)
997    }
998
999    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1000    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1001    /// `details` field of the audit log event that records creating or dropping replicas.
1002    ///
1003    /// # Panics
1004    ///
1005    /// Panics if the identified cluster is not a managed cluster.
1006    /// Panics if `new_config` is not a configuration for a managed cluster.
1007    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1008        &mut self,
1009        session: Option<&Session>,
1010        cluster_id: ClusterId,
1011        new_config: ClusterConfig,
1012        reason: ReplicaCreateDropReason,
1013        strategy: AlterClusterPlanStrategy,
1014    ) -> Result<NeedsFinalization, AdapterError> {
1015        let cluster = self.catalog.get_cluster(cluster_id);
1016        let name = cluster.name().to_string();
1017        let owner_id = cluster.owner_id();
1018
1019        let mut ops = vec![];
1020        let mut finalization_needed = NeedsFinalization::No;
1021
1022        let ClusterVariant::Managed(ClusterVariantManaged {
1023            size,
1024            availability_zones,
1025            logging,
1026            replication_factor,
1027            optimizer_feature_overrides: _,
1028            schedule: _,
1029        }) = &cluster.config.variant
1030        else {
1031            panic!("expected existing managed cluster config");
1032        };
1033        let ClusterVariant::Managed(ClusterVariantManaged {
1034            size: new_size,
1035            replication_factor: new_replication_factor,
1036            availability_zones: new_availability_zones,
1037            logging: new_logging,
1038            optimizer_feature_overrides: _,
1039            schedule: _,
1040        }) = &new_config.variant
1041        else {
1042            panic!("expected new managed cluster config");
1043        };
1044
1045        let role_id = session.map(|s| s.role_metadata().current_role);
1046        self.catalog.ensure_valid_replica_size(
1047            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1048            new_size,
1049        )?;
1050
1051        // check for active updates
1052        if cluster.replicas().any(|r| r.config.location.pending()) {
1053            return Err(AlterClusterWhilePendingReplicas);
1054        }
1055
1056        let compute = mz_sql::plan::ComputeReplicaConfig {
1057            introspection: new_logging
1058                .interval
1059                .map(|interval| ComputeReplicaIntrospectionConfig {
1060                    debugging: new_logging.log_logging,
1061                    interval,
1062                }),
1063        };
1064
1065        // Eagerly validate the `max_replicas_per_cluster` limit.
1066        // `catalog_transact` will do this validation too, but allocating
1067        // replica IDs is expensive enough that we need to do this validation
1068        // before allocating replica IDs. See database-issues#6046.
1069        if new_replication_factor > replication_factor {
1070            if cluster_id.is_user() {
1071                self.validate_resource_limit(
1072                    usize::cast_from(*replication_factor),
1073                    i64::from(*new_replication_factor) - i64::from(*replication_factor),
1074                    SystemVars::max_replicas_per_cluster,
1075                    "cluster replica",
1076                    MAX_REPLICAS_PER_CLUSTER.name(),
1077                )?;
1078            }
1079        }
1080
1081        if new_size != size
1082            || new_availability_zones != availability_zones
1083            || new_logging != logging
1084        {
1085            self.ensure_valid_azs(new_availability_zones.iter())?;
1086            // If we're not doing a zero-downtime reconfig tear down all
1087            // replicas, create new ones else create the pending replicas and
1088            // return early asking for finalization
1089            match strategy {
1090                AlterClusterPlanStrategy::None => {
1091                    let replica_ids_and_reasons = (0..*replication_factor)
1092                        .map(managed_cluster_replica_name)
1093                        .filter_map(|name| cluster.replica_id(&name))
1094                        .map(|replica_id| {
1095                            catalog::DropObjectInfo::ClusterReplica((
1096                                cluster.id(),
1097                                replica_id,
1098                                reason.clone(),
1099                            ))
1100                        })
1101                        .collect();
1102                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1103                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1104                        self.create_managed_cluster_replica_op(
1105                            cluster_id,
1106                            name.clone(),
1107                            &compute,
1108                            new_size,
1109                            &mut ops,
1110                            Some(new_availability_zones.as_ref()),
1111                            false,
1112                            owner_id,
1113                            reason.clone(),
1114                        )?;
1115                    }
1116                }
1117                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1118                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1119                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1120                        self.create_managed_cluster_replica_op(
1121                            cluster_id,
1122                            name.clone(),
1123                            &compute,
1124                            new_size,
1125                            &mut ops,
1126                            Some(new_availability_zones.as_ref()),
1127                            true,
1128                            owner_id,
1129                            reason.clone(),
1130                        )?;
1131                    }
1132                    finalization_needed = NeedsFinalization::Yes;
1133                }
1134            }
1135        } else if new_replication_factor < replication_factor {
1136            // Adjust replica count down
1137            let replica_ids = (*new_replication_factor..*replication_factor)
1138                .map(managed_cluster_replica_name)
1139                .filter_map(|name| cluster.replica_id(&name))
1140                .map(|replica_id| {
1141                    catalog::DropObjectInfo::ClusterReplica((
1142                        cluster.id(),
1143                        replica_id,
1144                        reason.clone(),
1145                    ))
1146                })
1147                .collect();
1148            ops.push(catalog::Op::DropObjects(replica_ids));
1149        } else if new_replication_factor > replication_factor {
1150            // Adjust replica count up
1151            for name in
1152                (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1153            {
1154                self.create_managed_cluster_replica_op(
1155                    cluster_id,
1156                    name.clone(),
1157                    &compute,
1158                    new_size,
1159                    &mut ops,
1160                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1161                    // rescheduled.
1162                    Some(new_availability_zones.as_ref()),
1163                    false,
1164                    owner_id,
1165                    reason.clone(),
1166                )?;
1167            }
1168        }
1169
1170        // If finalization is needed, finalization should update the cluster
1171        // config.
1172        match finalization_needed {
1173            NeedsFinalization::No => {
1174                ops.push(catalog::Op::UpdateClusterConfig {
1175                    id: cluster_id,
1176                    name: name.clone(),
1177                    config: new_config,
1178                });
1179            }
1180            NeedsFinalization::Yes => {}
1181        }
1182        self.catalog_transact(session, ops).await?;
1183        Ok(finalization_needed)
1184    }
1185
1186    /// # Panics
1187    ///
1188    /// Panics if `new_config` is not a configuration for a managed cluster.
1189    async fn sequence_alter_cluster_unmanaged_to_managed(
1190        &mut self,
1191        session: &Session,
1192        cluster_id: ClusterId,
1193        mut new_config: ClusterConfig,
1194        options: PlanClusterOption,
1195    ) -> Result<(), AdapterError> {
1196        let cluster = self.catalog.get_cluster(cluster_id);
1197        let cluster_name = cluster.name().to_string();
1198
1199        let ClusterVariant::Managed(ClusterVariantManaged {
1200            size: new_size,
1201            replication_factor: new_replication_factor,
1202            availability_zones: new_availability_zones,
1203            logging: _,
1204            optimizer_feature_overrides: _,
1205            schedule: _,
1206        }) = &mut new_config.variant
1207        else {
1208            panic!("expected new managed cluster config");
1209        };
1210
1211        // Validate replication factor parameter
1212        let user_replica_count = cluster
1213            .user_replicas()
1214            .count()
1215            .try_into()
1216            .expect("must_fit");
1217        match options.replication_factor {
1218            AlterOptionParameter::Set(_) => {
1219                // Validate that the replication factor matches the current length only if specified.
1220                if user_replica_count != *new_replication_factor {
1221                    coord_bail!(
1222                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1223                    );
1224                }
1225            }
1226            _ => {
1227                *new_replication_factor = user_replica_count;
1228            }
1229        }
1230
1231        let mut names = BTreeSet::new();
1232        let mut sizes = BTreeSet::new();
1233
1234        self.ensure_valid_azs(new_availability_zones.iter())?;
1235
1236        // Validate per-replica configuration
1237        for replica in cluster.user_replicas() {
1238            names.insert(replica.name.clone());
1239            match &replica.config.location {
1240                ReplicaLocation::Unmanaged(_) => coord_bail!(
1241                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1242                ),
1243                ReplicaLocation::Managed(location) => {
1244                    sizes.insert(location.size.clone());
1245
1246                    if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1247                        &location.availability_zones
1248                    {
1249                        if !new_availability_zones.contains(az) {
1250                            coord_bail!(
1251                                "unmanaged replica has availability zone {az} which is not \
1252                                in managed {new_availability_zones:?}"
1253                            )
1254                        }
1255                    }
1256                }
1257            }
1258        }
1259
1260        if sizes.is_empty() {
1261            assert!(
1262                cluster.user_replicas().next().is_none(),
1263                "Cluster should not have replicas"
1264            );
1265            // We didn't collect any size, so the user has to name it.
1266            match &options.size {
1267                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1268                    coord_bail!("Missing SIZE for empty cluster")
1269                }
1270                AlterOptionParameter::Set(_) => {} // Was set within the calling function.
1271            }
1272        } else if sizes.len() == 1 {
1273            let size = sizes.into_iter().next().expect("must exist");
1274            match &options.size {
1275                AlterOptionParameter::Set(sz) if *sz != size => {
1276                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1277                }
1278                _ => *new_size = size,
1279            }
1280        } else {
1281            let formatted = sizes
1282                .iter()
1283                .map(String::as_str)
1284                .collect::<Vec<_>>()
1285                .join(", ");
1286            coord_bail!(
1287                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1288            );
1289        }
1290
1291        for i in 0..*new_replication_factor {
1292            let name = managed_cluster_replica_name(i);
1293            names.remove(&name);
1294        }
1295        if !names.is_empty() {
1296            let formatted = names
1297                .iter()
1298                .map(String::as_str)
1299                .collect::<Vec<_>>()
1300                .join(", ");
1301            coord_bail!(
1302                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1303            );
1304        }
1305
1306        let ops = vec![catalog::Op::UpdateClusterConfig {
1307            id: cluster_id,
1308            name: cluster_name,
1309            config: new_config,
1310        }];
1311
1312        self.catalog_transact(Some(session), ops).await?;
1313        Ok(())
1314    }
1315
1316    async fn sequence_alter_cluster_managed_to_unmanaged(
1317        &mut self,
1318        session: &Session,
1319        cluster_id: ClusterId,
1320        new_config: ClusterConfig,
1321    ) -> Result<(), AdapterError> {
1322        let cluster = self.catalog().get_cluster(cluster_id);
1323
1324        let ops = vec![catalog::Op::UpdateClusterConfig {
1325            id: cluster_id,
1326            name: cluster.name().to_string(),
1327            config: new_config,
1328        }];
1329
1330        self.catalog_transact(Some(session), ops).await?;
1331        Ok(())
1332    }
1333
1334    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1335        &mut self,
1336        session: &Session,
1337        cluster_id: ClusterId,
1338        new_config: ClusterConfig,
1339        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1340    ) -> Result<(), AdapterError> {
1341        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1342            coord_bail!("Cannot alter replicas in unmanaged cluster");
1343        }
1344
1345        let cluster = self.catalog().get_cluster(cluster_id);
1346
1347        let ops = vec![catalog::Op::UpdateClusterConfig {
1348            id: cluster_id,
1349            name: cluster.name().to_string(),
1350            config: new_config,
1351        }];
1352
1353        self.catalog_transact(Some(session), ops).await?;
1354        Ok(())
1355    }
1356
1357    pub(crate) async fn sequence_alter_cluster_rename(
1358        &mut self,
1359        ctx: &mut ExecuteContext,
1360        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1361    ) -> Result<ExecuteResponse, AdapterError> {
1362        let op = Op::RenameCluster {
1363            id,
1364            name,
1365            to_name,
1366            check_reserved_names: true,
1367        };
1368        match self
1369            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1370            .await
1371        {
1372            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1373            Err(err) => Err(err),
1374        }
1375    }
1376
1377    pub(crate) async fn sequence_alter_cluster_swap(
1378        &mut self,
1379        ctx: &mut ExecuteContext,
1380        AlterClusterSwapPlan {
1381            id_a,
1382            id_b,
1383            name_a,
1384            name_b,
1385            name_temp,
1386        }: AlterClusterSwapPlan,
1387    ) -> Result<ExecuteResponse, AdapterError> {
1388        let op_a = Op::RenameCluster {
1389            id: id_a,
1390            name: name_a.clone(),
1391            to_name: name_temp.clone(),
1392            check_reserved_names: false,
1393        };
1394        let op_b = Op::RenameCluster {
1395            id: id_b,
1396            name: name_b.clone(),
1397            to_name: name_a,
1398            check_reserved_names: false,
1399        };
1400        let op_temp = Op::RenameCluster {
1401            id: id_a,
1402            name: name_temp,
1403            to_name: name_b,
1404            check_reserved_names: false,
1405        };
1406
1407        match self
1408            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1409                Box::pin(async {})
1410            })
1411            .await
1412        {
1413            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1414            Err(err) => Err(err),
1415        }
1416    }
1417
1418    pub(crate) async fn sequence_alter_cluster_replica_rename(
1419        &mut self,
1420        session: &Session,
1421        AlterClusterReplicaRenamePlan {
1422            cluster_id,
1423            replica_id,
1424            name,
1425            to_name,
1426        }: AlterClusterReplicaRenamePlan,
1427    ) -> Result<ExecuteResponse, AdapterError> {
1428        let op = catalog::Op::RenameClusterReplica {
1429            cluster_id,
1430            replica_id,
1431            name,
1432            to_name,
1433        };
1434        match self.catalog_transact(Some(session), vec![op]).await {
1435            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1436            Err(err) => Err(err),
1437        }
1438    }
1439
1440    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1441    pub(crate) async fn sequence_alter_set_cluster(
1442        &self,
1443        _session: &Session,
1444        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1445    ) -> Result<ExecuteResponse, AdapterError> {
1446        // TODO: This function needs to be implemented.
1447
1448        // Satisfy Clippy that this is an async func.
1449        async {}.await;
1450        let entry = self.catalog().get_entry(&id);
1451        match entry.item().typ() {
1452            _ => {
1453                // Unexpected; planner permitted unsupported plan.
1454                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1455            }
1456        }
1457    }
1458}
1459
1460fn managed_cluster_replica_name(index: u32) -> String {
1461    format!("r{}", index + 1)
1462}
1463
1464/// The type of finalization needed after an
1465/// operation such as alter_cluster_managed_to_managed.
1466#[derive(PartialEq)]
1467pub(crate) enum NeedsFinalization {
1468    /// Wait for the provided duration before finalizing
1469    Yes,
1470    No,
1471}