Skip to main content

mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
22    ReplicaLogging,
23};
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL};
25use mz_ore::cast::CastFrom;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46    Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53    type Ctx = ExecuteContext;
54
55    fn validity(&mut self) -> &mut PlanValidity {
56        match self {
57            Self::Alter(stage) => &mut stage.validity,
58            Self::WaitForHydrated(stage) => &mut stage.validity,
59            Self::Finalize(stage) => &mut stage.validity,
60        }
61    }
62
63    async fn stage(
64        self,
65        coord: &mut Coordinator,
66        ctx: &mut ExecuteContext,
67    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68        match self {
69            Self::Alter(stage) => {
70                coord
71                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72                    .await
73            }
74            Self::WaitForHydrated(stage) => {
75                let AlterClusterWaitForHydrated {
76                    validity,
77                    plan,
78                    new_config,
79                    workload_class,
80                    timeout_time,
81                    on_timeout,
82                } = stage;
83                coord
84                    .check_if_pending_replicas_hydrated_stage(
85                        ctx.session(),
86                        plan,
87                        new_config,
88                        workload_class,
89                        timeout_time,
90                        on_timeout,
91                        validity,
92                    )
93                    .await
94            }
95            Self::Finalize(stage) => {
96                coord
97                    .finalize_alter_cluster_stage(
98                        ctx.session(),
99                        stage.plan.clone(),
100                        stage.new_config.clone(),
101                        stage.workload_class.clone(),
102                    )
103                    .await
104            }
105        }
106    }
107
108    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
109        Message::ClusterStageReady {
110            ctx,
111            span,
112            stage: self,
113        }
114    }
115
116    fn cancel_enabled(&self) -> bool {
117        true
118    }
119}
120
121impl Coordinator {
122    #[instrument]
123    pub(crate) async fn sequence_alter_cluster_staged(
124        &mut self,
125        ctx: ExecuteContext,
126        plan: plan::AlterClusterPlan,
127    ) {
128        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
129        self.sequence_staged(ctx, Span::current(), stage).await;
130    }
131
132    #[instrument]
133    async fn alter_cluster_validate(
134        &self,
135        session: &Session,
136        plan: plan::AlterClusterPlan,
137    ) -> Result<ClusterStage, AdapterError> {
138        let validity = PlanValidity::new(
139            self.catalog().transient_revision(),
140            BTreeSet::new(),
141            Some(plan.id.clone()),
142            None,
143            session.role_metadata().clone(),
144        );
145        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
146    }
147
148    async fn sequence_alter_cluster_stage(
149        &mut self,
150        session: &Session,
151        plan: plan::AlterClusterPlan,
152        validity: PlanValidity,
153    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
154        let AlterClusterPlan {
155            id: cluster_id,
156            name: _,
157            ref options,
158            ref strategy,
159        } = plan;
160
161        use mz_catalog::memory::objects::ClusterVariant::*;
162        use mz_sql::plan::AlterOptionParameter::*;
163        let cluster = self.catalog.get_cluster(cluster_id);
164        let config = cluster.config.clone();
165        let mut new_config = config.clone();
166
167        match (&new_config.variant, &options.managed) {
168            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
169            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
170            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
171            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
172                // Generate a minimal correct configuration
173
174                // Size adjusted later when sequencing the actual configuration change.
175                let size = "".to_string();
176                let logging = ReplicaLogging {
177                    log_logging: false,
178                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
179                };
180                new_config.variant = Managed(ClusterVariantManaged {
181                    size,
182                    availability_zones: Default::default(),
183                    logging,
184                    replication_factor: 1,
185                    optimizer_feature_overrides: Default::default(),
186                    schedule: Default::default(),
187                });
188            }
189        }
190
191        match &mut new_config.variant {
192            Managed(ClusterVariantManaged {
193                size,
194                availability_zones,
195                logging,
196                replication_factor,
197                optimizer_feature_overrides: _,
198                schedule,
199            }) => {
200                match &options.size {
201                    Set(s) => size.clone_from(s),
202                    Reset => coord_bail!("SIZE has no default value"),
203                    Unchanged => {}
204                }
205                match &options.availability_zones {
206                    Set(az) => availability_zones.clone_from(az),
207                    Reset => *availability_zones = Default::default(),
208                    Unchanged => {}
209                }
210                match &options.introspection_debugging {
211                    Set(id) => logging.log_logging = *id,
212                    Reset => logging.log_logging = false,
213                    Unchanged => {}
214                }
215                match &options.introspection_interval {
216                    Set(ii) => logging.interval = ii.0,
217                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
218                    Unchanged => {}
219                }
220                match &options.replication_factor {
221                    Set(rf) => *replication_factor = *rf,
222                    Reset => {
223                        *replication_factor = self
224                            .catalog
225                            .system_config()
226                            .default_cluster_replication_factor()
227                    }
228                    Unchanged => {}
229                }
230                match &options.schedule {
231                    Set(new_schedule) => {
232                        *schedule = new_schedule.clone();
233                    }
234                    Reset => *schedule = Default::default(),
235                    Unchanged => {}
236                }
237                if !matches!(options.replicas, Unchanged) {
238                    coord_bail!("Cannot change REPLICAS of managed clusters");
239                }
240            }
241            Unmanaged => {
242                if !matches!(options.size, Unchanged) {
243                    coord_bail!("Cannot change SIZE of unmanaged clusters");
244                }
245                if !matches!(options.availability_zones, Unchanged) {
246                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
247                }
248                if !matches!(options.introspection_debugging, Unchanged) {
249                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
250                }
251                if !matches!(options.introspection_interval, Unchanged) {
252                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
253                }
254                if !matches!(options.replication_factor, Unchanged) {
255                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
256                }
257            }
258        }
259
260        match &options.workload_class {
261            Set(wc) => new_config.workload_class.clone_from(wc),
262            Reset => new_config.workload_class = None,
263            Unchanged => {}
264        }
265
266        if new_config == config {
267            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
268                ObjectType::Cluster,
269            )));
270        }
271
272        match (&config.variant, &new_config.variant) {
273            (Managed(_), Managed(new_config_managed)) => {
274                let alter_followup = self
275                    .sequence_alter_cluster_managed_to_managed(
276                        Some(session),
277                        cluster_id,
278                        new_config.clone(),
279                        ReplicaCreateDropReason::Manual,
280                        strategy.clone(),
281                    )
282                    .await?;
283                if alter_followup == NeedsFinalization::Yes {
284                    // For non backgrounded zero-downtime alters, store the
285                    // cluster_id in the ConnMeta to allow for cancellation.
286                    self.active_conns
287                        .get_mut(session.conn_id())
288                        .expect("There must be an active connection")
289                        .pending_cluster_alters
290                        .insert(cluster_id.clone());
291                    let new_config_managed = new_config_managed.clone();
292                    return match &strategy {
293                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
294                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
295                                .into(),
296                        )),
297                        AlterClusterPlanStrategy::For(duration) => {
298                            let span = Span::current();
299                            let plan = plan.clone();
300                            let duration = duration.clone().to_owned();
301                            let workload_class = new_config.workload_class.clone();
302                            Ok(StageResult::Handle(mz_ore::task::spawn(
303                                || "Finalize Alter Cluster",
304                                async move {
305                                    tokio::time::sleep(duration).await;
306                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
307                                        validity,
308                                        plan,
309                                        new_config: new_config_managed,
310                                        workload_class,
311                                    });
312                                    Ok(Box::new(stage))
313                                }
314                                .instrument(span),
315                            )))
316                        }
317                        AlterClusterPlanStrategy::UntilReady {
318                            timeout,
319                            on_timeout,
320                        } => Ok(StageResult::Immediate(Box::new(
321                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
322                                validity,
323                                plan: plan.clone(),
324                                new_config: new_config_managed.clone(),
325                                workload_class: new_config.workload_class.clone(),
326                                timeout_time: Instant::now() + timeout.to_owned(),
327                                on_timeout: on_timeout.to_owned(),
328                            }),
329                        ))),
330                    };
331                }
332            }
333            (Unmanaged, Managed(_)) => {
334                self.sequence_alter_cluster_unmanaged_to_managed(
335                    session,
336                    cluster_id,
337                    new_config,
338                    options.to_owned(),
339                )
340                .await?;
341            }
342            (Managed(_), Unmanaged) => {
343                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
344                    .await?;
345            }
346            (Unmanaged, Unmanaged) => {
347                self.sequence_alter_cluster_unmanaged_to_unmanaged(
348                    session,
349                    cluster_id,
350                    new_config,
351                    options.replicas.clone(),
352                )
353                .await?;
354            }
355        }
356
357        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
358            ObjectType::Cluster,
359        )))
360    }
361
362    async fn finalize_alter_cluster_stage(
363        &mut self,
364        session: &Session,
365        AlterClusterPlan {
366            id: cluster_id,
367            name: cluster_name,
368            ..
369        }: AlterClusterPlan,
370        new_config: ClusterVariantManaged,
371        workload_class: Option<String>,
372    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
373        let cluster = self.catalog.get_cluster(cluster_id);
374        let mut ops = vec![];
375
376        // Gather the ops to remove the non pending replicas
377        // Also skip any billed_as free replicas
378        let remove_replicas = cluster
379            .replicas()
380            .filter_map(|r| {
381                if !r.config.location.pending() && !r.config.location.internal() {
382                    Some(catalog::DropObjectInfo::ClusterReplica((
383                        cluster_id.clone(),
384                        r.replica_id,
385                        ReplicaCreateDropReason::Manual,
386                    )))
387                } else {
388                    None
389                }
390            })
391            .collect();
392        ops.push(catalog::Op::DropObjects(remove_replicas));
393
394        // Gather the Ops to remove the "-pending" suffix from the name and set
395        // pending to false
396        let finalize_replicas: Vec<catalog::Op> = cluster
397            .replicas()
398            .filter_map(|r| {
399                if r.config.location.pending() {
400                    let cluster_ident = match Ident::new(cluster.name.clone()) {
401                        Ok(id) => id,
402                        Err(err) => {
403                            return Some(Err(AdapterError::internal(
404                                "Unexpected error parsing cluster name",
405                                err,
406                            )));
407                        }
408                    };
409                    let replica_ident = match Ident::new(r.name.clone()) {
410                        Ok(id) => id,
411                        Err(err) => {
412                            return Some(Err(AdapterError::internal(
413                                "Unexpected error parsing replica name",
414                                err,
415                            )));
416                        }
417                    };
418                    Some(Ok((cluster_ident, replica_ident, r)))
419                } else {
420                    None
421                }
422            })
423            // Early collection is to handle errors from generating of the
424            // Idents
425            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
426            .into_iter()
427            .map(|(cluster_ident, replica_ident, replica)| {
428                let mut new_replica_config = replica.config.clone();
429                debug!("Promoting replica: {}", replica.name);
430                match new_replica_config.location {
431                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
432                        ref mut pending,
433                        ..
434                    }) => {
435                        *pending = false;
436                    }
437                    mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
438                }
439
440                let mut replica_ops = vec![];
441                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
442                if let Some(to_name) = to_name {
443                    replica_ops.push(catalog::Op::RenameClusterReplica {
444                        cluster_id: cluster_id.clone(),
445                        replica_id: replica.replica_id.to_owned(),
446                        name: QualifiedReplica {
447                            cluster: cluster_ident,
448                            replica: replica_ident,
449                        },
450                        to_name: to_name.to_owned(),
451                    });
452                }
453                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
454                    cluster_id,
455                    replica_id: replica.replica_id.to_owned(),
456                    config: new_replica_config,
457                });
458                replica_ops
459            })
460            .flatten()
461            .collect();
462
463        ops.extend(finalize_replicas);
464
465        // Add the Op to update the cluster state
466        ops.push(Op::UpdateClusterConfig {
467            id: cluster_id,
468            name: cluster_name,
469            config: ClusterConfig {
470                variant: ClusterVariant::Managed(new_config),
471                workload_class: workload_class.clone(),
472            },
473        });
474        self.catalog_transact(Some(session), ops).await?;
475        // Remove the cluster being altered from the ConnMeta
476        // pending_cluster_alters BTreeSet
477        self.active_conns
478            .get_mut(session.conn_id())
479            .expect("There must be an active connection")
480            .pending_cluster_alters
481            .remove(&cluster_id);
482
483        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
484            ObjectType::Cluster,
485        )))
486    }
487
488    async fn check_if_pending_replicas_hydrated_stage(
489        &mut self,
490        session: &Session,
491        plan: AlterClusterPlan,
492        new_config: ClusterVariantManaged,
493        workload_class: Option<String>,
494        timeout_time: Instant,
495        on_timeout: OnTimeoutAction,
496        validity: PlanValidity,
497    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
498        // wait and re-signal wait for hydrated if not hydrated
499        let cluster = self.catalog.get_cluster(plan.id);
500        let pending_replicas = cluster
501            .replicas()
502            .filter_map(|r| {
503                if r.config.location.pending() {
504                    Some(r.replica_id.clone())
505                } else {
506                    None
507                }
508            })
509            .collect_vec();
510        // Check For timeout
511        if Instant::now() > timeout_time {
512            // Timed out handle timeout action
513            match on_timeout {
514                OnTimeoutAction::Rollback => {
515                    self.active_conns
516                        .get_mut(session.conn_id())
517                        .expect("There must be an active connection")
518                        .pending_cluster_alters
519                        .remove(&cluster.id);
520                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
521                        .await?;
522                    return Err(AdapterError::AlterClusterTimeout);
523                }
524                OnTimeoutAction::Commit => {
525                    let span = Span::current();
526                    let poll_duration = self
527                        .catalog
528                        .system_config()
529                        .cluster_alter_check_ready_interval()
530                        .clone();
531                    return Ok(StageResult::Handle(mz_ore::task::spawn(
532                        || "Finalize Alter Cluster",
533                        async move {
534                            tokio::time::sleep(poll_duration).await;
535                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
536                                validity,
537                                plan,
538                                new_config,
539                                workload_class,
540                            });
541                            Ok(Box::new(stage))
542                        }
543                        .instrument(span),
544                    )));
545                }
546            }
547        }
548        let compute_hydrated_fut = self
549            .controller
550            .compute
551            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
552            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
553
554        let storage_hydrated = self
555            .controller
556            .storage
557            .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
558            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560        let span = Span::current();
561        Ok(StageResult::Handle(mz_ore::task::spawn(
562            || "Alter Cluster: wait for hydrated",
563            async move {
564                let compute_hydrated = compute_hydrated_fut
565                    .await
566                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
567
568                if compute_hydrated && storage_hydrated {
569                    // We're done
570                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
571                        validity,
572                        plan,
573                        new_config: new_config.clone(),
574                        workload_class: workload_class.clone(),
575                    })))
576                } else {
577                    // Check later
578                    tokio::time::sleep(Duration::from_secs(1)).await;
579                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
580                        validity,
581                        plan,
582                        new_config,
583                        workload_class,
584                        timeout_time,
585                        on_timeout,
586                    });
587                    Ok(Box::new(stage))
588                }
589            }
590            .instrument(span),
591        )))
592    }
593
594    #[mz_ore::instrument(level = "debug")]
595    pub(crate) async fn sequence_create_cluster(
596        &mut self,
597        session: &Session,
598        CreateClusterPlan {
599            name,
600            variant,
601            workload_class,
602        }: CreateClusterPlan,
603    ) -> Result<ExecuteResponse, AdapterError> {
604        tracing::debug!("sequence_create_cluster");
605
606        let id_ts = self.get_catalog_write_ts().await;
607        let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
608        // The catalog items for the introspection sources are shared between all replicas
609        // of a compute instance, so we create them unconditionally during instance creation.
610        // Whether a replica actually maintains introspection arrangements is determined by the
611        // per-replica introspection configuration.
612        let introspection_sources = BUILTINS::logs().collect();
613        let cluster_variant = match &variant {
614            CreateClusterVariant::Managed(plan) => {
615                let logging = if let Some(config) = plan.compute.introspection {
616                    ReplicaLogging {
617                        log_logging: config.debugging,
618                        interval: Some(config.interval),
619                    }
620                } else {
621                    ReplicaLogging::default()
622                };
623                ClusterVariant::Managed(ClusterVariantManaged {
624                    size: plan.size.clone(),
625                    availability_zones: plan.availability_zones.clone(),
626                    logging,
627                    replication_factor: plan.replication_factor,
628                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
629                    schedule: plan.schedule.clone(),
630                })
631            }
632            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
633        };
634        let config = ClusterConfig {
635            variant: cluster_variant,
636            workload_class,
637        };
638        let ops = vec![catalog::Op::CreateCluster {
639            id,
640            name: name.clone(),
641            introspection_sources,
642            owner_id: *session.current_role_id(),
643            config,
644        }];
645
646        match variant {
647            CreateClusterVariant::Managed(plan) => {
648                self.sequence_create_managed_cluster(session, plan, id, ops)
649                    .await
650            }
651            CreateClusterVariant::Unmanaged(plan) => {
652                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
653                    .await
654            }
655        }
656    }
657
658    #[mz_ore::instrument(level = "debug")]
659    async fn sequence_create_managed_cluster(
660        &mut self,
661        session: &Session,
662        CreateClusterManagedPlan {
663            availability_zones,
664            compute,
665            replication_factor,
666            size,
667            optimizer_feature_overrides: _,
668            schedule: _,
669        }: CreateClusterManagedPlan,
670        cluster_id: ClusterId,
671        mut ops: Vec<catalog::Op>,
672    ) -> Result<ExecuteResponse, AdapterError> {
673        tracing::debug!("sequence_create_managed_cluster");
674
675        self.ensure_valid_azs(availability_zones.iter())?;
676
677        let role_id = session.role_metadata().current_role;
678        self.catalog.ensure_valid_replica_size(
679            &self
680                .catalog()
681                .get_role_allowed_cluster_sizes(&Some(role_id)),
682            &size,
683        )?;
684
685        // Eagerly validate the `max_replicas_per_cluster` limit.
686        // `catalog_transact` will do this validation too, but allocating
687        // replica IDs is expensive enough that we need to do this validation
688        // before allocating replica IDs. See database-issues#6046.
689        if cluster_id.is_user() {
690            self.validate_resource_limit(
691                0,
692                i64::from(replication_factor),
693                SystemVars::max_replicas_per_cluster,
694                "cluster replica",
695                MAX_REPLICAS_PER_CLUSTER.name(),
696            )?;
697        }
698
699        for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
700            self.create_managed_cluster_replica_op(
701                cluster_id,
702                replica_name,
703                &compute,
704                &size,
705                &mut ops,
706                if availability_zones.is_empty() {
707                    None
708                } else {
709                    Some(availability_zones.as_ref())
710                },
711                false,
712                *session.current_role_id(),
713                ReplicaCreateDropReason::Manual,
714            )?;
715        }
716
717        self.catalog_transact(Some(session), ops).await?;
718
719        Ok(ExecuteResponse::CreatedCluster)
720    }
721
722    fn create_managed_cluster_replica_op(
723        &self,
724        cluster_id: ClusterId,
725        name: String,
726        compute: &mz_sql::plan::ComputeReplicaConfig,
727        size: &String,
728        ops: &mut Vec<Op>,
729        azs: Option<&[String]>,
730        pending: bool,
731        owner_id: RoleId,
732        reason: ReplicaCreateDropReason,
733    ) -> Result<(), AdapterError> {
734        let location = mz_catalog::durable::ReplicaLocation::Managed {
735            availability_zone: None,
736            billed_as: None,
737            internal: false,
738            size: size.clone(),
739            pending,
740        };
741
742        let logging = if let Some(config) = compute.introspection {
743            ReplicaLogging {
744                log_logging: config.debugging,
745                interval: Some(config.interval),
746            }
747        } else {
748            ReplicaLogging::default()
749        };
750
751        let config = ReplicaConfig {
752            location: self.catalog().concretize_replica_location(
753                location,
754                &self
755                    .catalog()
756                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
757                azs,
758            )?,
759            compute: ComputeReplicaConfig { logging },
760        };
761
762        ops.push(catalog::Op::CreateClusterReplica {
763            cluster_id,
764            name,
765            config,
766            owner_id,
767            reason,
768        });
769        Ok(())
770    }
771
772    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
773        &self,
774        azs: I,
775    ) -> Result<(), AdapterError> {
776        let cat_azs = self.catalog().state().availability_zones();
777        for az in azs.into_iter() {
778            if !cat_azs.contains(az) {
779                return Err(AdapterError::InvalidClusterReplicaAz {
780                    az: az.to_string(),
781                    expected: cat_azs.to_vec(),
782                });
783            }
784        }
785        Ok(())
786    }
787
788    #[mz_ore::instrument(level = "debug")]
789    async fn sequence_create_unmanaged_cluster(
790        &mut self,
791        session: &Session,
792        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
793        id: ClusterId,
794        mut ops: Vec<catalog::Op>,
795    ) -> Result<ExecuteResponse, AdapterError> {
796        tracing::debug!("sequence_create_unmanaged_cluster");
797
798        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
799            if let mz_sql::plan::ReplicaConfig::Orchestrated {
800                availability_zone: Some(az),
801                ..
802            } = &r
803            {
804                Some(az)
805            } else {
806                None
807            }
808        }))?;
809
810        // Eagerly validate the `max_replicas_per_cluster` limit.
811        // `catalog_transact` will do this validation too, but allocating
812        // replica IDs is expensive enough that we need to do this validation
813        // before allocating replica IDs. See database-issues#6046.
814        if id.is_user() {
815            self.validate_resource_limit(
816                0,
817                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
818                SystemVars::max_replicas_per_cluster,
819                "cluster replica",
820                MAX_REPLICAS_PER_CLUSTER.name(),
821            )?;
822        }
823
824        for (replica_name, replica_config) in replicas {
825            // If the AZ was not specified, choose one, round-robin, from the ones with
826            // the lowest number of configured replicas for this cluster.
827            let (compute, location) = match replica_config {
828                mz_sql::plan::ReplicaConfig::Unorchestrated {
829                    storagectl_addrs,
830                    computectl_addrs,
831                    compute,
832                } => {
833                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
834                        storagectl_addrs,
835                        computectl_addrs,
836                    };
837                    (compute, location)
838                }
839                mz_sql::plan::ReplicaConfig::Orchestrated {
840                    availability_zone,
841                    billed_as,
842                    compute,
843                    internal,
844                    size,
845                } => {
846                    // Only internal users have access to INTERNAL and BILLED AS
847                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
848                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
849                    }
850                    // BILLED AS implies the INTERNAL flag.
851                    if billed_as.is_some() && !internal {
852                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
853                    }
854
855                    let location = mz_catalog::durable::ReplicaLocation::Managed {
856                        availability_zone,
857                        billed_as,
858                        internal,
859                        size: size.clone(),
860                        pending: false,
861                    };
862                    (compute, location)
863                }
864            };
865
866            let logging = if let Some(config) = compute.introspection {
867                ReplicaLogging {
868                    log_logging: config.debugging,
869                    interval: Some(config.interval),
870                }
871            } else {
872                ReplicaLogging::default()
873            };
874
875            let role_id = session.role_metadata().current_role;
876            let config = ReplicaConfig {
877                location: self.catalog().concretize_replica_location(
878                    location,
879                    &self
880                        .catalog()
881                        .get_role_allowed_cluster_sizes(&Some(role_id)),
882                    None,
883                )?,
884                compute: ComputeReplicaConfig { logging },
885            };
886
887            ops.push(catalog::Op::CreateClusterReplica {
888                cluster_id: id,
889                name: replica_name.clone(),
890                config,
891                owner_id: *session.current_role_id(),
892                reason: ReplicaCreateDropReason::Manual,
893            });
894        }
895
896        self.catalog_transact(Some(session), ops).await?;
897
898        Ok(ExecuteResponse::CreatedCluster)
899    }
900
901    #[mz_ore::instrument(level = "debug")]
902    pub(crate) async fn sequence_create_cluster_replica(
903        &mut self,
904        session: &Session,
905        CreateClusterReplicaPlan {
906            name,
907            cluster_id,
908            config,
909        }: CreateClusterReplicaPlan,
910    ) -> Result<ExecuteResponse, AdapterError> {
911        // Choose default AZ if necessary
912        let (compute, location) = match config {
913            mz_sql::plan::ReplicaConfig::Unorchestrated {
914                storagectl_addrs,
915                computectl_addrs,
916                compute,
917            } => {
918                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
919                    storagectl_addrs,
920                    computectl_addrs,
921                };
922                (compute, location)
923            }
924            mz_sql::plan::ReplicaConfig::Orchestrated {
925                availability_zone,
926                billed_as,
927                compute,
928                internal,
929                size,
930            } => {
931                let availability_zone = match availability_zone {
932                    Some(az) => {
933                        self.ensure_valid_azs([&az])?;
934                        Some(az)
935                    }
936                    None => None,
937                };
938                let location = mz_catalog::durable::ReplicaLocation::Managed {
939                    availability_zone,
940                    billed_as,
941                    internal,
942                    size,
943                    pending: false,
944                };
945                (compute, location)
946            }
947        };
948
949        let logging = if let Some(config) = compute.introspection {
950            ReplicaLogging {
951                log_logging: config.debugging,
952                interval: Some(config.interval),
953            }
954        } else {
955            ReplicaLogging::default()
956        };
957
958        let role_id = session.role_metadata().current_role;
959        let config = ReplicaConfig {
960            location: self.catalog().concretize_replica_location(
961                location,
962                &self
963                    .catalog()
964                    .get_role_allowed_cluster_sizes(&Some(role_id)),
965                // Planning ensures all replicas in this codepath
966                // are unmanaged.
967                None,
968            )?,
969            compute: ComputeReplicaConfig { logging },
970        };
971
972        let cluster = self.catalog().get_cluster(cluster_id);
973
974        if let ReplicaLocation::Managed(ManagedReplicaLocation {
975            internal,
976            billed_as,
977            ..
978        }) = &config.location
979        {
980            // Only internal users have access to INTERNAL and BILLED AS
981            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
982                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
983            }
984            // Managed clusters require the INTERNAL flag.
985            if cluster.is_managed() && !*internal {
986                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
987            }
988            // BILLED AS implies the INTERNAL flag.
989            if billed_as.is_some() && !*internal {
990                coord_bail!("must specify INTERNAL when specifying BILLED AS");
991            }
992        }
993
994        // Replicas have the same owner as their cluster.
995        let owner_id = cluster.owner_id();
996        let op = catalog::Op::CreateClusterReplica {
997            cluster_id,
998            name: name.clone(),
999            config,
1000            owner_id,
1001            reason: ReplicaCreateDropReason::Manual,
1002        };
1003
1004        self.catalog_transact(Some(session), vec![op]).await?;
1005
1006        Ok(ExecuteResponse::CreatedClusterReplica)
1007    }
1008
1009    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1010    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1011    /// `details` field of the audit log event that records creating or dropping replicas.
1012    ///
1013    /// # Panics
1014    ///
1015    /// Panics if the identified cluster is not a managed cluster.
1016    /// Panics if `new_config` is not a configuration for a managed cluster.
1017    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1018        &mut self,
1019        session: Option<&Session>,
1020        cluster_id: ClusterId,
1021        new_config: ClusterConfig,
1022        reason: ReplicaCreateDropReason,
1023        strategy: AlterClusterPlanStrategy,
1024    ) -> Result<NeedsFinalization, AdapterError> {
1025        let cluster = self.catalog.get_cluster(cluster_id);
1026        let name = cluster.name().to_string();
1027        let owner_id = cluster.owner_id();
1028
1029        let mut ops = vec![];
1030        let mut finalization_needed = NeedsFinalization::No;
1031
1032        let ClusterVariant::Managed(ClusterVariantManaged {
1033            size,
1034            availability_zones,
1035            logging,
1036            replication_factor,
1037            optimizer_feature_overrides: _,
1038            schedule: _,
1039        }) = &cluster.config.variant
1040        else {
1041            panic!("expected existing managed cluster config");
1042        };
1043        let ClusterVariant::Managed(ClusterVariantManaged {
1044            size: new_size,
1045            replication_factor: new_replication_factor,
1046            availability_zones: new_availability_zones,
1047            logging: new_logging,
1048            optimizer_feature_overrides: _,
1049            schedule: _,
1050        }) = &new_config.variant
1051        else {
1052            panic!("expected new managed cluster config");
1053        };
1054
1055        let role_id = session.map(|s| s.role_metadata().current_role);
1056        self.catalog.ensure_valid_replica_size(
1057            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1058            new_size,
1059        )?;
1060
1061        // check for active updates
1062        if cluster.replicas().any(|r| r.config.location.pending()) {
1063            return Err(AlterClusterWhilePendingReplicas);
1064        }
1065
1066        let compute = mz_sql::plan::ComputeReplicaConfig {
1067            introspection: new_logging
1068                .interval
1069                .map(|interval| ComputeReplicaIntrospectionConfig {
1070                    debugging: new_logging.log_logging,
1071                    interval,
1072                }),
1073        };
1074
1075        // Eagerly validate the `max_replicas_per_cluster` limit.
1076        // `catalog_transact` will do this validation too, but allocating
1077        // replica IDs is expensive enough that we need to do this validation
1078        // before allocating replica IDs. See database-issues#6046.
1079        if new_replication_factor > replication_factor {
1080            if cluster_id.is_user() {
1081                self.validate_resource_limit(
1082                    usize::cast_from(*replication_factor),
1083                    i64::from(*new_replication_factor) - i64::from(*replication_factor),
1084                    SystemVars::max_replicas_per_cluster,
1085                    "cluster replica",
1086                    MAX_REPLICAS_PER_CLUSTER.name(),
1087                )?;
1088            }
1089        }
1090
1091        if new_size != size
1092            || new_availability_zones != availability_zones
1093            || new_logging != logging
1094        {
1095            self.ensure_valid_azs(new_availability_zones.iter())?;
1096            // If we're not doing a zero-downtime reconfig tear down all
1097            // replicas, create new ones else create the pending replicas and
1098            // return early asking for finalization
1099            match strategy {
1100                AlterClusterPlanStrategy::None => {
1101                    let replica_ids_and_reasons = (0..*replication_factor)
1102                        .map(managed_cluster_replica_name)
1103                        .filter_map(|name| cluster.replica_id(&name))
1104                        .map(|replica_id| {
1105                            catalog::DropObjectInfo::ClusterReplica((
1106                                cluster.id(),
1107                                replica_id,
1108                                reason.clone(),
1109                            ))
1110                        })
1111                        .collect();
1112                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1113                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1114                        self.create_managed_cluster_replica_op(
1115                            cluster_id,
1116                            name.clone(),
1117                            &compute,
1118                            new_size,
1119                            &mut ops,
1120                            Some(new_availability_zones.as_ref()),
1121                            false,
1122                            owner_id,
1123                            reason.clone(),
1124                        )?;
1125                    }
1126                }
1127                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1128                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1129                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1130                        self.create_managed_cluster_replica_op(
1131                            cluster_id,
1132                            name.clone(),
1133                            &compute,
1134                            new_size,
1135                            &mut ops,
1136                            Some(new_availability_zones.as_ref()),
1137                            true,
1138                            owner_id,
1139                            reason.clone(),
1140                        )?;
1141                    }
1142                    finalization_needed = NeedsFinalization::Yes;
1143                }
1144            }
1145        } else if new_replication_factor < replication_factor {
1146            // Adjust replica count down
1147            let replica_ids = (*new_replication_factor..*replication_factor)
1148                .map(managed_cluster_replica_name)
1149                .filter_map(|name| cluster.replica_id(&name))
1150                .map(|replica_id| {
1151                    catalog::DropObjectInfo::ClusterReplica((
1152                        cluster.id(),
1153                        replica_id,
1154                        reason.clone(),
1155                    ))
1156                })
1157                .collect();
1158            ops.push(catalog::Op::DropObjects(replica_ids));
1159        } else if new_replication_factor > replication_factor {
1160            // Adjust replica count up
1161            for name in
1162                (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1163            {
1164                self.create_managed_cluster_replica_op(
1165                    cluster_id,
1166                    name.clone(),
1167                    &compute,
1168                    new_size,
1169                    &mut ops,
1170                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1171                    // rescheduled.
1172                    Some(new_availability_zones.as_ref()),
1173                    false,
1174                    owner_id,
1175                    reason.clone(),
1176                )?;
1177            }
1178        }
1179
1180        // If finalization is needed, finalization should update the cluster
1181        // config.
1182        match finalization_needed {
1183            NeedsFinalization::No => {
1184                ops.push(catalog::Op::UpdateClusterConfig {
1185                    id: cluster_id,
1186                    name: name.clone(),
1187                    config: new_config,
1188                });
1189            }
1190            NeedsFinalization::Yes => {}
1191        }
1192        self.catalog_transact(session, ops).await?;
1193        Ok(finalization_needed)
1194    }
1195
1196    /// # Panics
1197    ///
1198    /// Panics if `new_config` is not a configuration for a managed cluster.
1199    async fn sequence_alter_cluster_unmanaged_to_managed(
1200        &mut self,
1201        session: &Session,
1202        cluster_id: ClusterId,
1203        mut new_config: ClusterConfig,
1204        options: PlanClusterOption,
1205    ) -> Result<(), AdapterError> {
1206        let cluster = self.catalog.get_cluster(cluster_id);
1207        let cluster_name = cluster.name().to_string();
1208
1209        let ClusterVariant::Managed(ClusterVariantManaged {
1210            size: new_size,
1211            replication_factor: new_replication_factor,
1212            availability_zones: new_availability_zones,
1213            logging: _,
1214            optimizer_feature_overrides: _,
1215            schedule: _,
1216        }) = &mut new_config.variant
1217        else {
1218            panic!("expected new managed cluster config");
1219        };
1220
1221        // Validate replication factor parameter
1222        let user_replica_count = cluster
1223            .user_replicas()
1224            .count()
1225            .try_into()
1226            .expect("must_fit");
1227        match options.replication_factor {
1228            AlterOptionParameter::Set(_) => {
1229                // Validate that the replication factor matches the current length only if specified.
1230                if user_replica_count != *new_replication_factor {
1231                    coord_bail!(
1232                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1233                    );
1234                }
1235            }
1236            _ => {
1237                *new_replication_factor = user_replica_count;
1238            }
1239        }
1240
1241        let mut names = BTreeSet::new();
1242        let mut sizes = BTreeSet::new();
1243
1244        self.ensure_valid_azs(new_availability_zones.iter())?;
1245
1246        // Validate per-replica configuration
1247        for replica in cluster.user_replicas() {
1248            names.insert(replica.name.clone());
1249            match &replica.config.location {
1250                ReplicaLocation::Unmanaged(_) => coord_bail!(
1251                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1252                ),
1253                ReplicaLocation::Managed(location) => {
1254                    sizes.insert(location.size.clone());
1255
1256                    if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1257                        &location.availability_zones
1258                    {
1259                        if !new_availability_zones.contains(az) {
1260                            coord_bail!(
1261                                "unmanaged replica has availability zone {az} which is not \
1262                                in managed {new_availability_zones:?}"
1263                            )
1264                        }
1265                    }
1266                }
1267            }
1268        }
1269
1270        if sizes.is_empty() {
1271            assert!(
1272                cluster.user_replicas().next().is_none(),
1273                "Cluster should not have replicas"
1274            );
1275            // We didn't collect any size, so the user has to name it.
1276            match &options.size {
1277                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1278                    coord_bail!("Missing SIZE for empty cluster")
1279                }
1280                AlterOptionParameter::Set(_) => {} // Was set within the calling function.
1281            }
1282        } else if sizes.len() == 1 {
1283            let size = sizes.into_iter().next().expect("must exist");
1284            match &options.size {
1285                AlterOptionParameter::Set(sz) if *sz != size => {
1286                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1287                }
1288                _ => *new_size = size,
1289            }
1290        } else {
1291            let formatted = sizes
1292                .iter()
1293                .map(String::as_str)
1294                .collect::<Vec<_>>()
1295                .join(", ");
1296            coord_bail!(
1297                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1298            );
1299        }
1300
1301        for i in 0..*new_replication_factor {
1302            let name = managed_cluster_replica_name(i);
1303            names.remove(&name);
1304        }
1305        if !names.is_empty() {
1306            let formatted = names
1307                .iter()
1308                .map(String::as_str)
1309                .collect::<Vec<_>>()
1310                .join(", ");
1311            coord_bail!(
1312                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1313            );
1314        }
1315
1316        let ops = vec![catalog::Op::UpdateClusterConfig {
1317            id: cluster_id,
1318            name: cluster_name,
1319            config: new_config,
1320        }];
1321
1322        self.catalog_transact(Some(session), ops).await?;
1323        Ok(())
1324    }
1325
1326    async fn sequence_alter_cluster_managed_to_unmanaged(
1327        &mut self,
1328        session: &Session,
1329        cluster_id: ClusterId,
1330        new_config: ClusterConfig,
1331    ) -> Result<(), AdapterError> {
1332        let cluster = self.catalog().get_cluster(cluster_id);
1333
1334        let ops = vec![catalog::Op::UpdateClusterConfig {
1335            id: cluster_id,
1336            name: cluster.name().to_string(),
1337            config: new_config,
1338        }];
1339
1340        self.catalog_transact(Some(session), ops).await?;
1341        Ok(())
1342    }
1343
1344    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1345        &mut self,
1346        session: &Session,
1347        cluster_id: ClusterId,
1348        new_config: ClusterConfig,
1349        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1350    ) -> Result<(), AdapterError> {
1351        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1352            coord_bail!("Cannot alter replicas in unmanaged cluster");
1353        }
1354
1355        let cluster = self.catalog().get_cluster(cluster_id);
1356
1357        let ops = vec![catalog::Op::UpdateClusterConfig {
1358            id: cluster_id,
1359            name: cluster.name().to_string(),
1360            config: new_config,
1361        }];
1362
1363        self.catalog_transact(Some(session), ops).await?;
1364        Ok(())
1365    }
1366
1367    pub(crate) async fn sequence_alter_cluster_rename(
1368        &mut self,
1369        ctx: &mut ExecuteContext,
1370        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1371    ) -> Result<ExecuteResponse, AdapterError> {
1372        let op = Op::RenameCluster {
1373            id,
1374            name,
1375            to_name,
1376            check_reserved_names: true,
1377        };
1378        match self
1379            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1380            .await
1381        {
1382            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1383            Err(err) => Err(err),
1384        }
1385    }
1386
1387    pub(crate) async fn sequence_alter_cluster_swap(
1388        &mut self,
1389        ctx: &mut ExecuteContext,
1390        AlterClusterSwapPlan {
1391            id_a,
1392            id_b,
1393            name_a,
1394            name_b,
1395            name_temp,
1396        }: AlterClusterSwapPlan,
1397    ) -> Result<ExecuteResponse, AdapterError> {
1398        let op_a = Op::RenameCluster {
1399            id: id_a,
1400            name: name_a.clone(),
1401            to_name: name_temp.clone(),
1402            check_reserved_names: false,
1403        };
1404        let op_b = Op::RenameCluster {
1405            id: id_b,
1406            name: name_b.clone(),
1407            to_name: name_a,
1408            check_reserved_names: false,
1409        };
1410        let op_temp = Op::RenameCluster {
1411            id: id_a,
1412            name: name_temp,
1413            to_name: name_b,
1414            check_reserved_names: false,
1415        };
1416
1417        match self
1418            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1419                Box::pin(async {})
1420            })
1421            .await
1422        {
1423            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1424            Err(err) => Err(err),
1425        }
1426    }
1427
1428    pub(crate) async fn sequence_alter_cluster_replica_rename(
1429        &mut self,
1430        session: &Session,
1431        AlterClusterReplicaRenamePlan {
1432            cluster_id,
1433            replica_id,
1434            name,
1435            to_name,
1436        }: AlterClusterReplicaRenamePlan,
1437    ) -> Result<ExecuteResponse, AdapterError> {
1438        let op = catalog::Op::RenameClusterReplica {
1439            cluster_id,
1440            replica_id,
1441            name,
1442            to_name,
1443        };
1444        match self.catalog_transact(Some(session), vec![op]).await {
1445            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1446            Err(err) => Err(err),
1447        }
1448    }
1449
1450    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1451    pub(crate) async fn sequence_alter_set_cluster(
1452        &self,
1453        _session: &Session,
1454        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1455    ) -> Result<ExecuteResponse, AdapterError> {
1456        // TODO: This function needs to be implemented.
1457
1458        // Satisfy Clippy that this is an async func.
1459        async {}.await;
1460        let entry = self.catalog().get_entry(&id);
1461        match entry.item().typ() {
1462            _ => {
1463                // Unexpected; planner permitted unsupported plan.
1464                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1465            }
1466        }
1467    }
1468}
1469
1470fn managed_cluster_replica_name(index: u32) -> String {
1471    format!("r{}", index + 1)
1472}
1473
1474/// The type of finalization needed after an
1475/// operation such as alter_cluster_managed_to_managed.
1476#[derive(PartialEq)]
1477pub(crate) enum NeedsFinalization {
1478    /// Wait for the provided duration before finalizing
1479    Yes,
1480    No,
1481}