mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23    ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46    Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53    type Ctx = ExecuteContext;
54
55    fn validity(&mut self) -> &mut PlanValidity {
56        match self {
57            Self::Alter(stage) => &mut stage.validity,
58            Self::WaitForHydrated(stage) => &mut stage.validity,
59            Self::Finalize(stage) => &mut stage.validity,
60        }
61    }
62
63    async fn stage(
64        self,
65        coord: &mut Coordinator,
66        ctx: &mut ExecuteContext,
67    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68        match self {
69            Self::Alter(stage) => {
70                coord
71                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72                    .await
73            }
74            Self::WaitForHydrated(stage) => {
75                let AlterClusterWaitForHydrated {
76                    validity,
77                    plan,
78                    new_config,
79                    timeout_time,
80                    on_timeout,
81                } = stage;
82                coord
83                    .check_if_pending_replicas_hydrated_stage(
84                        ctx.session(),
85                        plan,
86                        new_config,
87                        timeout_time,
88                        on_timeout,
89                        validity,
90                    )
91                    .await
92            }
93            Self::Finalize(stage) => {
94                coord
95                    .finalize_alter_cluster_stage(
96                        ctx.session(),
97                        stage.plan.clone(),
98                        stage.new_config.clone(),
99                    )
100                    .await
101            }
102        }
103    }
104
105    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106        Message::ClusterStageReady {
107            ctx,
108            span,
109            stage: self,
110        }
111    }
112
113    fn cancel_enabled(&self) -> bool {
114        true
115    }
116}
117
118impl Coordinator {
119    #[instrument]
120    pub(crate) async fn sequence_alter_cluster_staged(
121        &mut self,
122        ctx: ExecuteContext,
123        plan: plan::AlterClusterPlan,
124    ) {
125        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126        self.sequence_staged(ctx, Span::current(), stage).await;
127    }
128
129    #[instrument]
130    async fn alter_cluster_validate(
131        &mut self,
132        session: &Session,
133        plan: plan::AlterClusterPlan,
134    ) -> Result<ClusterStage, AdapterError> {
135        let validity = PlanValidity::new(
136            self.catalog().transient_revision(),
137            BTreeSet::new(),
138            Some(plan.id.clone()),
139            None,
140            session.role_metadata().clone(),
141        );
142        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143    }
144
145    async fn sequence_alter_cluster_stage(
146        &mut self,
147        session: &Session,
148        plan: plan::AlterClusterPlan,
149        validity: PlanValidity,
150    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151        let AlterClusterPlan {
152            id: cluster_id,
153            name: _,
154            ref options,
155            ref strategy,
156        } = plan;
157
158        use mz_catalog::memory::objects::ClusterVariant::*;
159        use mz_sql::plan::AlterOptionParameter::*;
160        let cluster = self.catalog.get_cluster(cluster_id);
161        let config = cluster.config.clone();
162        let mut new_config = config.clone();
163
164        match (&new_config.variant, &options.managed) {
165            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169                // Generate a minimal correct configuration
170
171                // Size adjusted later when sequencing the actual configuration change.
172                let size = "".to_string();
173                let logging = ReplicaLogging {
174                    log_logging: false,
175                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
176                };
177                new_config.variant = Managed(ClusterVariantManaged {
178                    size,
179                    availability_zones: Default::default(),
180                    logging,
181                    replication_factor: 1,
182                    optimizer_feature_overrides: Default::default(),
183                    schedule: Default::default(),
184                });
185            }
186        }
187
188        match &mut new_config.variant {
189            Managed(ClusterVariantManaged {
190                size,
191                availability_zones,
192                logging,
193                replication_factor,
194                optimizer_feature_overrides: _,
195                schedule,
196            }) => {
197                match &options.size {
198                    Set(s) => size.clone_from(s),
199                    Reset => coord_bail!("SIZE has no default value"),
200                    Unchanged => {}
201                }
202                match &options.availability_zones {
203                    Set(az) => availability_zones.clone_from(az),
204                    Reset => *availability_zones = Default::default(),
205                    Unchanged => {}
206                }
207                match &options.introspection_debugging {
208                    Set(id) => logging.log_logging = *id,
209                    Reset => logging.log_logging = false,
210                    Unchanged => {}
211                }
212                match &options.introspection_interval {
213                    Set(ii) => logging.interval = ii.0,
214                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
215                    Unchanged => {}
216                }
217                match &options.replication_factor {
218                    Set(rf) => *replication_factor = *rf,
219                    Reset => {
220                        *replication_factor = self
221                            .catalog
222                            .system_config()
223                            .default_cluster_replication_factor()
224                    }
225                    Unchanged => {}
226                }
227                match &options.schedule {
228                    Set(new_schedule) => {
229                        *schedule = new_schedule.clone();
230                    }
231                    Reset => *schedule = Default::default(),
232                    Unchanged => {}
233                }
234                if !matches!(options.replicas, Unchanged) {
235                    coord_bail!("Cannot change REPLICAS of managed clusters");
236                }
237            }
238            Unmanaged => {
239                if !matches!(options.size, Unchanged) {
240                    coord_bail!("Cannot change SIZE of unmanaged clusters");
241                }
242                if !matches!(options.availability_zones, Unchanged) {
243                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
244                }
245                if !matches!(options.introspection_debugging, Unchanged) {
246                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
247                }
248                if !matches!(options.introspection_interval, Unchanged) {
249                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
250                }
251                if !matches!(options.replication_factor, Unchanged) {
252                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
253                }
254            }
255        }
256
257        match &options.workload_class {
258            Set(wc) => new_config.workload_class.clone_from(wc),
259            Reset => new_config.workload_class = None,
260            Unchanged => {}
261        }
262
263        if new_config == config {
264            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
265                ObjectType::Cluster,
266            )));
267        }
268
269        let new_workload_class = new_config.workload_class.clone();
270        match (&config.variant, &new_config.variant) {
271            (Managed(_), Managed(new_config_managed)) => {
272                let alter_followup = self
273                    .sequence_alter_cluster_managed_to_managed(
274                        Some(session),
275                        cluster_id,
276                        new_config.clone(),
277                        ReplicaCreateDropReason::Manual,
278                        strategy.clone(),
279                    )
280                    .await?;
281                if alter_followup == NeedsFinalization::Yes {
282                    // For non backgrounded zero-downtime alters, store the
283                    // cluster_id in the ConnMeta to allow for cancellation.
284                    self.active_conns
285                        .get_mut(session.conn_id())
286                        .expect("There must be an active connection")
287                        .pending_cluster_alters
288                        .insert(cluster_id.clone());
289                    let new_config_managed = new_config_managed.clone();
290                    return match &strategy {
291                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
292                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
293                                .into(),
294                        )),
295                        AlterClusterPlanStrategy::For(duration) => {
296                            let span = Span::current();
297                            let plan = plan.clone();
298                            let duration = duration.clone().to_owned();
299                            Ok(StageResult::Handle(mz_ore::task::spawn(
300                                || "Finalize Alter Cluster",
301                                async move {
302                                    tokio::time::sleep(duration).await;
303                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
304                                        validity,
305                                        plan,
306                                        new_config: new_config_managed,
307                                    });
308                                    Ok(Box::new(stage))
309                                }
310                                .instrument(span),
311                            )))
312                        }
313                        AlterClusterPlanStrategy::UntilReady {
314                            timeout,
315                            on_timeout,
316                        } => Ok(StageResult::Immediate(Box::new(
317                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
318                                validity,
319                                plan: plan.clone(),
320                                new_config: new_config_managed.clone(),
321                                timeout_time: Instant::now() + timeout.to_owned(),
322                                on_timeout: on_timeout.to_owned(),
323                            }),
324                        ))),
325                    };
326                }
327            }
328            (Unmanaged, Managed(_)) => {
329                self.sequence_alter_cluster_unmanaged_to_managed(
330                    session,
331                    cluster_id,
332                    new_config,
333                    options.to_owned(),
334                )
335                .await?;
336            }
337            (Managed(_), Unmanaged) => {
338                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
339                    .await?;
340            }
341            (Unmanaged, Unmanaged) => {
342                self.sequence_alter_cluster_unmanaged_to_unmanaged(
343                    session,
344                    cluster_id,
345                    new_config,
346                    options.replicas.clone(),
347                )
348                .await?;
349            }
350        }
351
352        self.controller
353            .update_cluster_workload_class(cluster_id, new_workload_class)?;
354
355        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
356            ObjectType::Cluster,
357        )))
358    }
359
360    async fn finalize_alter_cluster_stage(
361        &mut self,
362        session: &Session,
363        AlterClusterPlan {
364            id: cluster_id,
365            name: cluster_name,
366            ..
367        }: AlterClusterPlan,
368        new_config: ClusterVariantManaged,
369    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
370        let cluster = self.catalog.get_cluster(cluster_id);
371        let workload_class = cluster.config.workload_class.clone();
372        let mut ops = vec![];
373
374        // Gather the ops to remove the non pending replicas
375        // Also skip any billed_as free replicas
376        let remove_replicas = cluster
377            .replicas()
378            .filter_map(|r| {
379                if !r.config.location.pending() && !r.config.location.internal() {
380                    Some(catalog::DropObjectInfo::ClusterReplica((
381                        cluster_id.clone(),
382                        r.replica_id,
383                        ReplicaCreateDropReason::Manual,
384                    )))
385                } else {
386                    None
387                }
388            })
389            .collect();
390        ops.push(catalog::Op::DropObjects(remove_replicas));
391
392        // Gather the Ops to remove the "-pending" suffix from the name and set
393        // pending to false
394        let finalize_replicas: Vec<catalog::Op> = cluster
395            .replicas()
396            .filter_map(|r| {
397                if r.config.location.pending() {
398                    let cluster_ident = match Ident::new(cluster.name.clone()) {
399                        Ok(id) => id,
400                        Err(err) => {
401                            return Some(Err(AdapterError::internal(
402                                "Unexpected error parsing cluster name",
403                                err,
404                            )));
405                        }
406                    };
407                    let replica_ident = match Ident::new(r.name.clone()) {
408                        Ok(id) => id,
409                        Err(err) => {
410                            return Some(Err(AdapterError::internal(
411                                "Unexpected error parsing replica name",
412                                err,
413                            )));
414                        }
415                    };
416                    Some(Ok((cluster_ident, replica_ident, r)))
417                } else {
418                    None
419                }
420            })
421            // Early collection is to handle errors from generating of the
422            // Idents
423            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
424            .into_iter()
425            .map(|(cluster_ident, replica_ident, replica)| {
426                let mut new_replica_config = replica.config.clone();
427                debug!("Promoting replica: {}", replica.name);
428                match new_replica_config.location {
429                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
430                        ref mut pending,
431                        ..
432                    }) => {
433                        *pending = false;
434                    }
435                    _ => {}
436                }
437
438                let mut replica_ops = vec![];
439                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
440                if let Some(to_name) = to_name {
441                    replica_ops.push(catalog::Op::RenameClusterReplica {
442                        cluster_id: cluster_id.clone(),
443                        replica_id: replica.replica_id.to_owned(),
444                        name: QualifiedReplica {
445                            cluster: cluster_ident,
446                            replica: replica_ident,
447                        },
448                        to_name: to_name.to_owned(),
449                    });
450                }
451                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
452                    cluster_id,
453                    replica_id: replica.replica_id.to_owned(),
454                    config: new_replica_config,
455                });
456                replica_ops
457            })
458            .flatten()
459            .collect();
460
461        ops.extend(finalize_replicas);
462
463        // Add the Op to update the cluster state
464        ops.push(Op::UpdateClusterConfig {
465            id: cluster_id,
466            name: cluster_name,
467            config: ClusterConfig {
468                variant: ClusterVariant::Managed(new_config),
469                workload_class: workload_class.clone(),
470            },
471        });
472        self.catalog_transact(Some(session), ops).await?;
473        // Remove the cluster being altered from the ConnMeta
474        // pending_cluster_alters BTreeSet
475        self.active_conns
476            .get_mut(session.conn_id())
477            .expect("There must be an active connection")
478            .pending_cluster_alters
479            .remove(&cluster_id);
480
481        self.controller
482            .update_cluster_workload_class(cluster_id, workload_class)?;
483
484        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
485            ObjectType::Cluster,
486        )))
487    }
488
489    async fn check_if_pending_replicas_hydrated_stage(
490        &mut self,
491        session: &Session,
492        plan: AlterClusterPlan,
493        new_config: ClusterVariantManaged,
494        timeout_time: Instant,
495        on_timeout: OnTimeoutAction,
496        validity: PlanValidity,
497    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
498        // wait and re-signal wait for hydrated if not hydrated
499        let cluster = self.catalog.get_cluster(plan.id);
500        let pending_replicas = cluster
501            .replicas()
502            .filter_map(|r| {
503                if r.config.location.pending() {
504                    Some(r.replica_id.clone())
505                } else {
506                    None
507                }
508            })
509            .collect_vec();
510        // Check For timeout
511        if Instant::now() > timeout_time {
512            // Timed out handle timeout action
513            match on_timeout {
514                OnTimeoutAction::Rollback => {
515                    self.active_conns
516                        .get_mut(session.conn_id())
517                        .expect("There must be an active connection")
518                        .pending_cluster_alters
519                        .remove(&cluster.id);
520                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
521                        .await?;
522                    return Err(AdapterError::AlterClusterTimeout);
523                }
524                OnTimeoutAction::Commit => {
525                    let span = Span::current();
526                    let poll_duration = self
527                        .catalog
528                        .system_config()
529                        .cluster_alter_check_ready_interval()
530                        .clone();
531                    return Ok(StageResult::Handle(mz_ore::task::spawn(
532                        || "Finalize Alter Cluster",
533                        async move {
534                            tokio::time::sleep(poll_duration).await;
535                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
536                                validity,
537                                plan,
538                                new_config,
539                            });
540                            Ok(Box::new(stage))
541                        }
542                        .instrument(span),
543                    )));
544                }
545            }
546        }
547        let compute_hydrated_fut = self
548            .controller
549            .compute
550            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
551            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
552
553        let storage_hydrated = self
554            .controller
555            .storage
556            .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
557            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
558
559        let span = Span::current();
560        Ok(StageResult::Handle(mz_ore::task::spawn(
561            || "Alter Cluster: wait for hydrated",
562            async move {
563                let compute_hydrated = compute_hydrated_fut
564                    .await
565                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
566
567                if compute_hydrated && storage_hydrated {
568                    // We're done
569                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
570                        validity,
571                        plan,
572                        new_config,
573                    })))
574                } else {
575                    // Check later
576                    tokio::time::sleep(Duration::from_secs(1)).await;
577                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
578                        validity,
579                        plan,
580                        new_config,
581                        timeout_time,
582                        on_timeout,
583                    });
584                    Ok(Box::new(stage))
585                }
586            }
587            .instrument(span),
588        )))
589    }
590
591    #[mz_ore::instrument(level = "debug")]
592    pub(crate) async fn sequence_create_cluster(
593        &mut self,
594        session: &Session,
595        CreateClusterPlan {
596            name,
597            variant,
598            workload_class,
599        }: CreateClusterPlan,
600    ) -> Result<ExecuteResponse, AdapterError> {
601        tracing::debug!("sequence_create_cluster");
602
603        let id_ts = self.get_catalog_write_ts().await;
604        let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
605        // The catalog items for the introspection sources are shared between all replicas
606        // of a compute instance, so we create them unconditionally during instance creation.
607        // Whether a replica actually maintains introspection arrangements is determined by the
608        // per-replica introspection configuration.
609        let introspection_sources = BUILTINS::logs().collect();
610        let cluster_variant = match &variant {
611            CreateClusterVariant::Managed(plan) => {
612                let logging = if let Some(config) = plan.compute.introspection {
613                    ReplicaLogging {
614                        log_logging: config.debugging,
615                        interval: Some(config.interval),
616                    }
617                } else {
618                    ReplicaLogging::default()
619                };
620                ClusterVariant::Managed(ClusterVariantManaged {
621                    size: plan.size.clone(),
622                    availability_zones: plan.availability_zones.clone(),
623                    logging,
624                    replication_factor: plan.replication_factor,
625                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
626                    schedule: plan.schedule.clone(),
627                })
628            }
629            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
630        };
631        let config = ClusterConfig {
632            variant: cluster_variant,
633            workload_class,
634        };
635        let ops = vec![catalog::Op::CreateCluster {
636            id,
637            name: name.clone(),
638            introspection_sources,
639            owner_id: *session.current_role_id(),
640            config,
641        }];
642
643        match variant {
644            CreateClusterVariant::Managed(plan) => {
645                self.sequence_create_managed_cluster(session, plan, id, ops)
646                    .await
647            }
648            CreateClusterVariant::Unmanaged(plan) => {
649                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
650                    .await
651            }
652        }
653    }
654
655    #[mz_ore::instrument(level = "debug")]
656    async fn sequence_create_managed_cluster(
657        &mut self,
658        session: &Session,
659        CreateClusterManagedPlan {
660            availability_zones,
661            compute,
662            replication_factor,
663            size,
664            optimizer_feature_overrides: _,
665            schedule: _,
666        }: CreateClusterManagedPlan,
667        cluster_id: ClusterId,
668        mut ops: Vec<catalog::Op>,
669    ) -> Result<ExecuteResponse, AdapterError> {
670        tracing::debug!("sequence_create_managed_cluster");
671
672        self.ensure_valid_azs(availability_zones.iter())?;
673
674        let role_id = session.role_metadata().current_role;
675        self.catalog.ensure_valid_replica_size(
676            &self
677                .catalog()
678                .get_role_allowed_cluster_sizes(&Some(role_id)),
679            &size,
680        )?;
681
682        // Eagerly validate the `max_replicas_per_cluster` limit.
683        // `catalog_transact` will do this validation too, but allocating
684        // replica IDs is expensive enough that we need to do this validation
685        // before allocating replica IDs. See database-issues#6046.
686        if cluster_id.is_user() {
687            self.validate_resource_limit(
688                0,
689                i64::from(replication_factor),
690                SystemVars::max_replicas_per_cluster,
691                "cluster replica",
692                MAX_REPLICAS_PER_CLUSTER.name(),
693            )?;
694        }
695
696        for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
697            self.create_managed_cluster_replica_op(
698                cluster_id,
699                replica_name,
700                &compute,
701                &size,
702                &mut ops,
703                if availability_zones.is_empty() {
704                    None
705                } else {
706                    Some(availability_zones.as_ref())
707                },
708                false,
709                *session.current_role_id(),
710                ReplicaCreateDropReason::Manual,
711            )?;
712        }
713
714        self.catalog_transact(Some(session), ops).await?;
715
716        self.create_cluster(cluster_id).await;
717
718        Ok(ExecuteResponse::CreatedCluster)
719    }
720
721    fn create_managed_cluster_replica_op(
722        &self,
723        cluster_id: ClusterId,
724        name: String,
725        compute: &mz_sql::plan::ComputeReplicaConfig,
726        size: &String,
727        ops: &mut Vec<Op>,
728        azs: Option<&[String]>,
729        pending: bool,
730        owner_id: RoleId,
731        reason: ReplicaCreateDropReason,
732    ) -> Result<(), AdapterError> {
733        let location = mz_catalog::durable::ReplicaLocation::Managed {
734            availability_zone: None,
735            billed_as: None,
736            internal: false,
737            size: size.clone(),
738            pending,
739        };
740
741        let logging = if let Some(config) = compute.introspection {
742            ReplicaLogging {
743                log_logging: config.debugging,
744                interval: Some(config.interval),
745            }
746        } else {
747            ReplicaLogging::default()
748        };
749
750        let config = ReplicaConfig {
751            location: self.catalog().concretize_replica_location(
752                location,
753                &self
754                    .catalog()
755                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
756                azs,
757            )?,
758            compute: ComputeReplicaConfig { logging },
759        };
760
761        ops.push(catalog::Op::CreateClusterReplica {
762            cluster_id,
763            name,
764            config,
765            owner_id,
766            reason,
767        });
768        Ok(())
769    }
770
771    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
772        &self,
773        azs: I,
774    ) -> Result<(), AdapterError> {
775        let cat_azs = self.catalog().state().availability_zones();
776        for az in azs.into_iter() {
777            if !cat_azs.contains(az) {
778                return Err(AdapterError::InvalidClusterReplicaAz {
779                    az: az.to_string(),
780                    expected: cat_azs.to_vec(),
781                });
782            }
783        }
784        Ok(())
785    }
786
787    #[mz_ore::instrument(level = "debug")]
788    async fn sequence_create_unmanaged_cluster(
789        &mut self,
790        session: &Session,
791        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
792        id: ClusterId,
793        mut ops: Vec<catalog::Op>,
794    ) -> Result<ExecuteResponse, AdapterError> {
795        tracing::debug!("sequence_create_unmanaged_cluster");
796
797        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
798            if let mz_sql::plan::ReplicaConfig::Orchestrated {
799                availability_zone: Some(az),
800                ..
801            } = &r
802            {
803                Some(az)
804            } else {
805                None
806            }
807        }))?;
808
809        // Eagerly validate the `max_replicas_per_cluster` limit.
810        // `catalog_transact` will do this validation too, but allocating
811        // replica IDs is expensive enough that we need to do this validation
812        // before allocating replica IDs. See database-issues#6046.
813        if id.is_user() {
814            self.validate_resource_limit(
815                0,
816                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
817                SystemVars::max_replicas_per_cluster,
818                "cluster replica",
819                MAX_REPLICAS_PER_CLUSTER.name(),
820            )?;
821        }
822
823        for (replica_name, replica_config) in replicas {
824            // If the AZ was not specified, choose one, round-robin, from the ones with
825            // the lowest number of configured replicas for this cluster.
826            let (compute, location) = match replica_config {
827                mz_sql::plan::ReplicaConfig::Unorchestrated {
828                    storagectl_addrs,
829                    computectl_addrs,
830                    compute,
831                } => {
832                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
833                        storagectl_addrs,
834                        computectl_addrs,
835                    };
836                    (compute, location)
837                }
838                mz_sql::plan::ReplicaConfig::Orchestrated {
839                    availability_zone,
840                    billed_as,
841                    compute,
842                    internal,
843                    size,
844                } => {
845                    // Only internal users have access to INTERNAL and BILLED AS
846                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
847                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
848                    }
849                    // BILLED AS implies the INTERNAL flag.
850                    if billed_as.is_some() && !internal {
851                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
852                    }
853
854                    let location = mz_catalog::durable::ReplicaLocation::Managed {
855                        availability_zone,
856                        billed_as,
857                        internal,
858                        size: size.clone(),
859                        pending: false,
860                    };
861                    (compute, location)
862                }
863            };
864
865            let logging = if let Some(config) = compute.introspection {
866                ReplicaLogging {
867                    log_logging: config.debugging,
868                    interval: Some(config.interval),
869                }
870            } else {
871                ReplicaLogging::default()
872            };
873
874            let role_id = session.role_metadata().current_role;
875            let config = ReplicaConfig {
876                location: self.catalog().concretize_replica_location(
877                    location,
878                    &self
879                        .catalog()
880                        .get_role_allowed_cluster_sizes(&Some(role_id)),
881                    None,
882                )?,
883                compute: ComputeReplicaConfig { logging },
884            };
885
886            ops.push(catalog::Op::CreateClusterReplica {
887                cluster_id: id,
888                name: replica_name.clone(),
889                config,
890                owner_id: *session.current_role_id(),
891                reason: ReplicaCreateDropReason::Manual,
892            });
893        }
894
895        self.catalog_transact(Some(session), ops).await?;
896
897        self.create_cluster(id).await;
898
899        Ok(ExecuteResponse::CreatedCluster)
900    }
901
902    async fn create_cluster(&mut self, cluster_id: ClusterId) {
903        let Coordinator {
904            catalog,
905            controller,
906            ..
907        } = self;
908        let cluster = catalog.get_cluster(cluster_id);
909        let cluster_id = cluster.id;
910        let introspection_source_ids: Vec<_> =
911            cluster.log_indexes.iter().map(|(_, id)| *id).collect();
912
913        controller
914            .create_cluster(
915                cluster_id,
916                mz_controller::clusters::ClusterConfig {
917                    arranged_logs: cluster.log_indexes.clone(),
918                    workload_class: cluster.config.workload_class.clone(),
919                },
920            )
921            .expect("creating cluster must not fail");
922
923        let replica_ids: Vec<_> = cluster
924            .replicas()
925            .map(|r| (r.replica_id, format!("{}.{}", cluster.name(), &r.name)))
926            .collect();
927        for (replica_id, replica_name) in replica_ids {
928            self.create_cluster_replica(cluster_id, replica_id, replica_name)
929                .await;
930        }
931
932        if !introspection_source_ids.is_empty() {
933            self.initialize_compute_read_policies(
934                introspection_source_ids,
935                cluster_id,
936                CompactionWindow::Default,
937            )
938            .await;
939        }
940    }
941
942    #[mz_ore::instrument(level = "debug")]
943    pub(crate) async fn sequence_create_cluster_replica(
944        &mut self,
945        session: &Session,
946        CreateClusterReplicaPlan {
947            name,
948            cluster_id,
949            config,
950        }: CreateClusterReplicaPlan,
951    ) -> Result<ExecuteResponse, AdapterError> {
952        // Choose default AZ if necessary
953        let (compute, location) = match config {
954            mz_sql::plan::ReplicaConfig::Unorchestrated {
955                storagectl_addrs,
956                computectl_addrs,
957                compute,
958            } => {
959                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
960                    storagectl_addrs,
961                    computectl_addrs,
962                };
963                (compute, location)
964            }
965            mz_sql::plan::ReplicaConfig::Orchestrated {
966                availability_zone,
967                billed_as,
968                compute,
969                internal,
970                size,
971            } => {
972                let availability_zone = match availability_zone {
973                    Some(az) => {
974                        self.ensure_valid_azs([&az])?;
975                        Some(az)
976                    }
977                    None => None,
978                };
979                let location = mz_catalog::durable::ReplicaLocation::Managed {
980                    availability_zone,
981                    billed_as,
982                    internal,
983                    size,
984                    pending: false,
985                };
986                (compute, location)
987            }
988        };
989
990        let logging = if let Some(config) = compute.introspection {
991            ReplicaLogging {
992                log_logging: config.debugging,
993                interval: Some(config.interval),
994            }
995        } else {
996            ReplicaLogging::default()
997        };
998
999        let role_id = session.role_metadata().current_role;
1000        let config = ReplicaConfig {
1001            location: self.catalog().concretize_replica_location(
1002                location,
1003                &self
1004                    .catalog()
1005                    .get_role_allowed_cluster_sizes(&Some(role_id)),
1006                // Planning ensures all replicas in this codepath
1007                // are unmanaged.
1008                None,
1009            )?,
1010            compute: ComputeReplicaConfig { logging },
1011        };
1012
1013        let cluster = self.catalog().get_cluster(cluster_id);
1014
1015        if let ReplicaLocation::Managed(ManagedReplicaLocation {
1016            internal,
1017            billed_as,
1018            ..
1019        }) = &config.location
1020        {
1021            // Only internal users have access to INTERNAL and BILLED AS
1022            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1023                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1024            }
1025            // Managed clusters require the INTERNAL flag.
1026            if cluster.is_managed() && !*internal {
1027                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1028            }
1029            // BILLED AS implies the INTERNAL flag.
1030            if billed_as.is_some() && !*internal {
1031                coord_bail!("must specify INTERNAL when specifying BILLED AS");
1032            }
1033        }
1034
1035        // Replicas have the same owner as their cluster.
1036        let owner_id = cluster.owner_id();
1037        let op = catalog::Op::CreateClusterReplica {
1038            cluster_id,
1039            name: name.clone(),
1040            config,
1041            owner_id,
1042            reason: ReplicaCreateDropReason::Manual,
1043        };
1044
1045        self.catalog_transact(Some(session), vec![op]).await?;
1046
1047        let id = self
1048            .catalog()
1049            .resolve_replica_in_cluster(&cluster_id, &name)
1050            .expect("just created")
1051            .replica_id();
1052
1053        self.create_cluster_replica(cluster_id, id, name).await;
1054
1055        Ok(ExecuteResponse::CreatedClusterReplica)
1056    }
1057
1058    async fn create_cluster_replica(
1059        &mut self,
1060        cluster_id: ClusterId,
1061        replica_id: ReplicaId,
1062        replica_name: String,
1063    ) {
1064        let cluster = self.catalog().get_cluster(cluster_id);
1065        let role = cluster.role();
1066        let replica_config = cluster
1067            .replica(replica_id)
1068            .expect("known to exist")
1069            .config
1070            .clone();
1071
1072        let enable_worker_core_affinity =
1073            self.catalog().system_config().enable_worker_core_affinity();
1074
1075        self.controller
1076            .create_replica(
1077                cluster_id,
1078                replica_id,
1079                cluster.name.to_owned(),
1080                replica_name,
1081                role,
1082                replica_config,
1083                enable_worker_core_affinity,
1084            )
1085            .expect("creating replicas must not fail");
1086
1087        self.install_introspection_subscribes(cluster_id, replica_id)
1088            .await;
1089    }
1090
1091    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1092    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1093    /// `details` field of the audit log event that records creating or dropping replicas.
1094    ///
1095    /// # Panics
1096    ///
1097    /// Panics if the identified cluster is not a managed cluster.
1098    /// Panics if `new_config` is not a configuration for a managed cluster.
1099    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1100        &mut self,
1101        session: Option<&Session>,
1102        cluster_id: ClusterId,
1103        new_config: ClusterConfig,
1104        reason: ReplicaCreateDropReason,
1105        strategy: AlterClusterPlanStrategy,
1106    ) -> Result<NeedsFinalization, AdapterError> {
1107        let cluster = self.catalog.get_cluster(cluster_id);
1108        let name = cluster.name().to_string();
1109        let owner_id = cluster.owner_id();
1110
1111        let mut ops = vec![];
1112        let mut create_cluster_replicas = vec![];
1113        let mut finalization_needed = NeedsFinalization::No;
1114
1115        let ClusterVariant::Managed(ClusterVariantManaged {
1116            size,
1117            availability_zones,
1118            logging,
1119            replication_factor,
1120            optimizer_feature_overrides: _,
1121            schedule: _,
1122        }) = &cluster.config.variant
1123        else {
1124            panic!("expected existing managed cluster config");
1125        };
1126        let ClusterVariant::Managed(ClusterVariantManaged {
1127            size: new_size,
1128            replication_factor: new_replication_factor,
1129            availability_zones: new_availability_zones,
1130            logging: new_logging,
1131            optimizer_feature_overrides: _,
1132            schedule: _,
1133        }) = &new_config.variant
1134        else {
1135            panic!("expected new managed cluster config");
1136        };
1137
1138        let role_id = session.map(|s| s.role_metadata().current_role);
1139        self.catalog.ensure_valid_replica_size(
1140            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1141            new_size,
1142        )?;
1143
1144        // check for active updates
1145        if cluster.replicas().any(|r| r.config.location.pending()) {
1146            coord_bail!("Cannot Alter clusters with pending updates.")
1147        }
1148
1149        let compute = mz_sql::plan::ComputeReplicaConfig {
1150            introspection: new_logging
1151                .interval
1152                .map(|interval| ComputeReplicaIntrospectionConfig {
1153                    debugging: new_logging.log_logging,
1154                    interval,
1155                }),
1156        };
1157
1158        // Eagerly validate the `max_replicas_per_cluster` limit.
1159        // `catalog_transact` will do this validation too, but allocating
1160        // replica IDs is expensive enough that we need to do this validation
1161        // before allocating replica IDs. See database-issues#6046.
1162        if new_replication_factor > replication_factor {
1163            if cluster_id.is_user() {
1164                self.validate_resource_limit(
1165                    usize::cast_from(*replication_factor),
1166                    i64::from(*new_replication_factor) - i64::from(*replication_factor),
1167                    SystemVars::max_replicas_per_cluster,
1168                    "cluster replica",
1169                    MAX_REPLICAS_PER_CLUSTER.name(),
1170                )?;
1171            }
1172        }
1173
1174        if new_size != size
1175            || new_availability_zones != availability_zones
1176            || new_logging != logging
1177        {
1178            self.ensure_valid_azs(new_availability_zones.iter())?;
1179            // If we're not doing a zero-downtime reconfig tear down all
1180            // replicas, create new ones else create the pending replicas and
1181            // return early asking for finalization
1182            match strategy {
1183                AlterClusterPlanStrategy::None => {
1184                    let replica_ids_and_reasons = (0..*replication_factor)
1185                        .map(managed_cluster_replica_name)
1186                        .filter_map(|name| cluster.replica_id(&name))
1187                        .map(|replica_id| {
1188                            catalog::DropObjectInfo::ClusterReplica((
1189                                cluster.id(),
1190                                replica_id,
1191                                reason.clone(),
1192                            ))
1193                        })
1194                        .collect();
1195                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1196                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1197                        self.create_managed_cluster_replica_op(
1198                            cluster_id,
1199                            name.clone(),
1200                            &compute,
1201                            new_size,
1202                            &mut ops,
1203                            Some(new_availability_zones.as_ref()),
1204                            false,
1205                            owner_id,
1206                            reason.clone(),
1207                        )?;
1208                        create_cluster_replicas.push((cluster_id, name));
1209                    }
1210                }
1211                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1212                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1213                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1214                        self.create_managed_cluster_replica_op(
1215                            cluster_id,
1216                            name.clone(),
1217                            &compute,
1218                            new_size,
1219                            &mut ops,
1220                            Some(new_availability_zones.as_ref()),
1221                            true,
1222                            owner_id,
1223                            reason.clone(),
1224                        )?;
1225                        create_cluster_replicas.push((cluster_id, name));
1226                    }
1227                    finalization_needed = NeedsFinalization::Yes;
1228                }
1229            }
1230        } else if new_replication_factor < replication_factor {
1231            // Adjust replica count down
1232            let replica_ids = (*new_replication_factor..*replication_factor)
1233                .map(managed_cluster_replica_name)
1234                .filter_map(|name| cluster.replica_id(&name))
1235                .map(|replica_id| {
1236                    catalog::DropObjectInfo::ClusterReplica((
1237                        cluster.id(),
1238                        replica_id,
1239                        reason.clone(),
1240                    ))
1241                })
1242                .collect();
1243            ops.push(catalog::Op::DropObjects(replica_ids));
1244        } else if new_replication_factor > replication_factor {
1245            // Adjust replica count up
1246            for name in
1247                (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1248            {
1249                self.create_managed_cluster_replica_op(
1250                    cluster_id,
1251                    name.clone(),
1252                    &compute,
1253                    new_size,
1254                    &mut ops,
1255                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1256                    // rescheduled.
1257                    Some(new_availability_zones.as_ref()),
1258                    false,
1259                    owner_id,
1260                    reason.clone(),
1261                )?;
1262                create_cluster_replicas.push((cluster_id, name))
1263            }
1264        }
1265
1266        // If finalization is needed, finalization should update the cluster
1267        // config.
1268        match finalization_needed {
1269            NeedsFinalization::No => {
1270                ops.push(catalog::Op::UpdateClusterConfig {
1271                    id: cluster_id,
1272                    name: name.clone(),
1273                    config: new_config,
1274                });
1275            }
1276            _ => {}
1277        }
1278        self.catalog_transact(session, ops.clone()).await?;
1279        for (cluster_id, replica_name) in create_cluster_replicas {
1280            let replica_id = self
1281                .catalog()
1282                .resolve_replica_in_cluster(&cluster_id, &replica_name)
1283                .expect("just created")
1284                .replica_id();
1285            self.create_cluster_replica(cluster_id, replica_id, replica_name)
1286                .await;
1287        }
1288        Ok(finalization_needed)
1289    }
1290
1291    /// # Panics
1292    ///
1293    /// Panics if `new_config` is not a configuration for a managed cluster.
1294    async fn sequence_alter_cluster_unmanaged_to_managed(
1295        &mut self,
1296        session: &Session,
1297        cluster_id: ClusterId,
1298        mut new_config: ClusterConfig,
1299        options: PlanClusterOption,
1300    ) -> Result<(), AdapterError> {
1301        let cluster = self.catalog.get_cluster(cluster_id);
1302        let cluster_name = cluster.name().to_string();
1303
1304        let ClusterVariant::Managed(ClusterVariantManaged {
1305            size: new_size,
1306            replication_factor: new_replication_factor,
1307            availability_zones: new_availability_zones,
1308            logging: _,
1309            optimizer_feature_overrides: _,
1310            schedule: _,
1311        }) = &mut new_config.variant
1312        else {
1313            panic!("expected new managed cluster config");
1314        };
1315
1316        // Validate replication factor parameter
1317        let user_replica_count = cluster
1318            .user_replicas()
1319            .count()
1320            .try_into()
1321            .expect("must_fit");
1322        match options.replication_factor {
1323            AlterOptionParameter::Set(_) => {
1324                // Validate that the replication factor matches the current length only if specified.
1325                if user_replica_count != *new_replication_factor {
1326                    coord_bail!(
1327                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1328                    );
1329                }
1330            }
1331            _ => {
1332                *new_replication_factor = user_replica_count;
1333            }
1334        }
1335
1336        let mut names = BTreeSet::new();
1337        let mut sizes = BTreeSet::new();
1338
1339        self.ensure_valid_azs(new_availability_zones.iter())?;
1340
1341        // Validate per-replica configuration
1342        for replica in cluster.user_replicas() {
1343            names.insert(replica.name.clone());
1344            match &replica.config.location {
1345                ReplicaLocation::Unmanaged(_) => coord_bail!(
1346                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1347                ),
1348                ReplicaLocation::Managed(location) => {
1349                    sizes.insert(location.size.clone());
1350
1351                    if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1352                        &location.availability_zones
1353                    {
1354                        if !new_availability_zones.contains(az) {
1355                            coord_bail!(
1356                                "unmanaged replica has availability zone {az} which is not \
1357                                in managed {new_availability_zones:?}"
1358                            )
1359                        }
1360                    }
1361                }
1362            }
1363        }
1364
1365        if sizes.is_empty() {
1366            assert!(
1367                cluster.user_replicas().next().is_none(),
1368                "Cluster should not have replicas"
1369            );
1370            // We didn't collect any size, so the user has to name it.
1371            match &options.size {
1372                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1373                    coord_bail!("Missing SIZE for empty cluster")
1374                }
1375                _ => {} // Was set within the calling function.
1376            }
1377        } else if sizes.len() == 1 {
1378            let size = sizes.into_iter().next().expect("must exist");
1379            match &options.size {
1380                AlterOptionParameter::Set(sz) if *sz != size => {
1381                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1382                }
1383                _ => *new_size = size,
1384            }
1385        } else {
1386            let formatted = sizes
1387                .iter()
1388                .map(String::as_str)
1389                .collect::<Vec<_>>()
1390                .join(", ");
1391            coord_bail!(
1392                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1393            );
1394        }
1395
1396        for i in 0..*new_replication_factor {
1397            let name = managed_cluster_replica_name(i);
1398            names.remove(&name);
1399        }
1400        if !names.is_empty() {
1401            let formatted = names
1402                .iter()
1403                .map(String::as_str)
1404                .collect::<Vec<_>>()
1405                .join(", ");
1406            coord_bail!(
1407                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1408            );
1409        }
1410
1411        let ops = vec![catalog::Op::UpdateClusterConfig {
1412            id: cluster_id,
1413            name: cluster_name,
1414            config: new_config,
1415        }];
1416
1417        self.catalog_transact(Some(session), ops).await?;
1418        Ok(())
1419    }
1420
1421    async fn sequence_alter_cluster_managed_to_unmanaged(
1422        &mut self,
1423        session: &Session,
1424        cluster_id: ClusterId,
1425        new_config: ClusterConfig,
1426    ) -> Result<(), AdapterError> {
1427        let cluster = self.catalog().get_cluster(cluster_id);
1428
1429        let ops = vec![catalog::Op::UpdateClusterConfig {
1430            id: cluster_id,
1431            name: cluster.name().to_string(),
1432            config: new_config,
1433        }];
1434
1435        self.catalog_transact(Some(session), ops).await?;
1436        Ok(())
1437    }
1438
1439    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1440        &mut self,
1441        session: &Session,
1442        cluster_id: ClusterId,
1443        new_config: ClusterConfig,
1444        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1445    ) -> Result<(), AdapterError> {
1446        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1447            coord_bail!("Cannot alter replicas in unmanaged cluster");
1448        }
1449
1450        let cluster = self.catalog().get_cluster(cluster_id);
1451
1452        let ops = vec![catalog::Op::UpdateClusterConfig {
1453            id: cluster_id,
1454            name: cluster.name().to_string(),
1455            config: new_config,
1456        }];
1457
1458        self.catalog_transact(Some(session), ops).await?;
1459        Ok(())
1460    }
1461
1462    pub(crate) async fn sequence_alter_cluster_rename(
1463        &mut self,
1464        ctx: &mut ExecuteContext,
1465        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1466    ) -> Result<ExecuteResponse, AdapterError> {
1467        let op = Op::RenameCluster {
1468            id,
1469            name,
1470            to_name,
1471            check_reserved_names: true,
1472        };
1473        match self
1474            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1475            .await
1476        {
1477            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1478            Err(err) => Err(err),
1479        }
1480    }
1481
1482    pub(crate) async fn sequence_alter_cluster_swap(
1483        &mut self,
1484        ctx: &mut ExecuteContext,
1485        AlterClusterSwapPlan {
1486            id_a,
1487            id_b,
1488            name_a,
1489            name_b,
1490            name_temp,
1491        }: AlterClusterSwapPlan,
1492    ) -> Result<ExecuteResponse, AdapterError> {
1493        let op_a = Op::RenameCluster {
1494            id: id_a,
1495            name: name_a.clone(),
1496            to_name: name_temp.clone(),
1497            check_reserved_names: false,
1498        };
1499        let op_b = Op::RenameCluster {
1500            id: id_b,
1501            name: name_b.clone(),
1502            to_name: name_a,
1503            check_reserved_names: false,
1504        };
1505        let op_temp = Op::RenameCluster {
1506            id: id_a,
1507            name: name_temp,
1508            to_name: name_b,
1509            check_reserved_names: false,
1510        };
1511
1512        match self
1513            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1514                Box::pin(async {})
1515            })
1516            .await
1517        {
1518            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1519            Err(err) => Err(err),
1520        }
1521    }
1522
1523    pub(crate) async fn sequence_alter_cluster_replica_rename(
1524        &mut self,
1525        session: &Session,
1526        AlterClusterReplicaRenamePlan {
1527            cluster_id,
1528            replica_id,
1529            name,
1530            to_name,
1531        }: AlterClusterReplicaRenamePlan,
1532    ) -> Result<ExecuteResponse, AdapterError> {
1533        let op = catalog::Op::RenameClusterReplica {
1534            cluster_id,
1535            replica_id,
1536            name,
1537            to_name,
1538        };
1539        match self.catalog_transact(Some(session), vec![op]).await {
1540            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1541            Err(err) => Err(err),
1542        }
1543    }
1544
1545    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1546    pub(crate) async fn sequence_alter_set_cluster(
1547        &self,
1548        _session: &Session,
1549        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1550    ) -> Result<ExecuteResponse, AdapterError> {
1551        // TODO: This function needs to be implemented.
1552
1553        // Satisfy Clippy that this is an async func.
1554        async {}.await;
1555        let entry = self.catalog().get_entry(&id);
1556        match entry.item().typ() {
1557            _ => {
1558                // Unexpected; planner permitted unsupported plan.
1559                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1560            }
1561        }
1562    }
1563}
1564
1565fn managed_cluster_replica_name(index: u32) -> String {
1566    format!("r{}", index + 1)
1567}
1568
1569/// The type of finalization needed after an
1570/// operation such as alter_cluster_managed_to_managed.
1571#[derive(PartialEq)]
1572pub(crate) enum NeedsFinalization {
1573    /// Wait for the provided duration before finalizing
1574    Yes,
1575    No,
1576}