mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23    ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::AdapterError::AlterClusterWhilePendingReplicas;
44use crate::catalog::{self, Op, ReplicaCreateDropReason};
45use crate::coord::{
46    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
47    Message, PlanValidity, StageResult, Staged,
48};
49use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
50
51const PENDING_REPLICA_SUFFIX: &str = "-pending";
52
53impl Staged for ClusterStage {
54    type Ctx = ExecuteContext;
55
56    fn validity(&mut self) -> &mut PlanValidity {
57        match self {
58            Self::Alter(stage) => &mut stage.validity,
59            Self::WaitForHydrated(stage) => &mut stage.validity,
60            Self::Finalize(stage) => &mut stage.validity,
61        }
62    }
63
64    async fn stage(
65        self,
66        coord: &mut Coordinator,
67        ctx: &mut ExecuteContext,
68    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
69        match self {
70            Self::Alter(stage) => {
71                coord
72                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
73                    .await
74            }
75            Self::WaitForHydrated(stage) => {
76                let AlterClusterWaitForHydrated {
77                    validity,
78                    plan,
79                    new_config,
80                    timeout_time,
81                    on_timeout,
82                } = stage;
83                coord
84                    .check_if_pending_replicas_hydrated_stage(
85                        ctx.session(),
86                        plan,
87                        new_config,
88                        timeout_time,
89                        on_timeout,
90                        validity,
91                    )
92                    .await
93            }
94            Self::Finalize(stage) => {
95                coord
96                    .finalize_alter_cluster_stage(
97                        ctx.session(),
98                        stage.plan.clone(),
99                        stage.new_config.clone(),
100                    )
101                    .await
102            }
103        }
104    }
105
106    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
107        Message::ClusterStageReady {
108            ctx,
109            span,
110            stage: self,
111        }
112    }
113
114    fn cancel_enabled(&self) -> bool {
115        true
116    }
117}
118
119impl Coordinator {
120    #[instrument]
121    pub(crate) async fn sequence_alter_cluster_staged(
122        &mut self,
123        ctx: ExecuteContext,
124        plan: plan::AlterClusterPlan,
125    ) {
126        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
127        self.sequence_staged(ctx, Span::current(), stage).await;
128    }
129
130    #[instrument]
131    async fn alter_cluster_validate(
132        &mut self,
133        session: &Session,
134        plan: plan::AlterClusterPlan,
135    ) -> Result<ClusterStage, AdapterError> {
136        let validity = PlanValidity::new(
137            self.catalog().transient_revision(),
138            BTreeSet::new(),
139            Some(plan.id.clone()),
140            None,
141            session.role_metadata().clone(),
142        );
143        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
144    }
145
146    async fn sequence_alter_cluster_stage(
147        &mut self,
148        session: &Session,
149        plan: plan::AlterClusterPlan,
150        validity: PlanValidity,
151    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
152        let AlterClusterPlan {
153            id: cluster_id,
154            name: _,
155            ref options,
156            ref strategy,
157        } = plan;
158
159        use mz_catalog::memory::objects::ClusterVariant::*;
160        use mz_sql::plan::AlterOptionParameter::*;
161        let cluster = self.catalog.get_cluster(cluster_id);
162        let config = cluster.config.clone();
163        let mut new_config = config.clone();
164
165        match (&new_config.variant, &options.managed) {
166            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
167            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
168            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
169            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
170                // Generate a minimal correct configuration
171
172                // Size adjusted later when sequencing the actual configuration change.
173                let size = "".to_string();
174                let logging = ReplicaLogging {
175                    log_logging: false,
176                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
177                };
178                new_config.variant = Managed(ClusterVariantManaged {
179                    size,
180                    availability_zones: Default::default(),
181                    logging,
182                    replication_factor: 1,
183                    optimizer_feature_overrides: Default::default(),
184                    schedule: Default::default(),
185                });
186            }
187        }
188
189        match &mut new_config.variant {
190            Managed(ClusterVariantManaged {
191                size,
192                availability_zones,
193                logging,
194                replication_factor,
195                optimizer_feature_overrides: _,
196                schedule,
197            }) => {
198                match &options.size {
199                    Set(s) => size.clone_from(s),
200                    Reset => coord_bail!("SIZE has no default value"),
201                    Unchanged => {}
202                }
203                match &options.availability_zones {
204                    Set(az) => availability_zones.clone_from(az),
205                    Reset => *availability_zones = Default::default(),
206                    Unchanged => {}
207                }
208                match &options.introspection_debugging {
209                    Set(id) => logging.log_logging = *id,
210                    Reset => logging.log_logging = false,
211                    Unchanged => {}
212                }
213                match &options.introspection_interval {
214                    Set(ii) => logging.interval = ii.0,
215                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
216                    Unchanged => {}
217                }
218                match &options.replication_factor {
219                    Set(rf) => *replication_factor = *rf,
220                    Reset => {
221                        *replication_factor = self
222                            .catalog
223                            .system_config()
224                            .default_cluster_replication_factor()
225                    }
226                    Unchanged => {}
227                }
228                match &options.schedule {
229                    Set(new_schedule) => {
230                        *schedule = new_schedule.clone();
231                    }
232                    Reset => *schedule = Default::default(),
233                    Unchanged => {}
234                }
235                if !matches!(options.replicas, Unchanged) {
236                    coord_bail!("Cannot change REPLICAS of managed clusters");
237                }
238            }
239            Unmanaged => {
240                if !matches!(options.size, Unchanged) {
241                    coord_bail!("Cannot change SIZE of unmanaged clusters");
242                }
243                if !matches!(options.availability_zones, Unchanged) {
244                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
245                }
246                if !matches!(options.introspection_debugging, Unchanged) {
247                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
248                }
249                if !matches!(options.introspection_interval, Unchanged) {
250                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
251                }
252                if !matches!(options.replication_factor, Unchanged) {
253                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
254                }
255            }
256        }
257
258        match &options.workload_class {
259            Set(wc) => new_config.workload_class.clone_from(wc),
260            Reset => new_config.workload_class = None,
261            Unchanged => {}
262        }
263
264        if new_config == config {
265            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
266                ObjectType::Cluster,
267            )));
268        }
269
270        let new_workload_class = new_config.workload_class.clone();
271        match (&config.variant, &new_config.variant) {
272            (Managed(_), Managed(new_config_managed)) => {
273                let alter_followup = self
274                    .sequence_alter_cluster_managed_to_managed(
275                        Some(session),
276                        cluster_id,
277                        new_config.clone(),
278                        ReplicaCreateDropReason::Manual,
279                        strategy.clone(),
280                    )
281                    .await?;
282                if alter_followup == NeedsFinalization::Yes {
283                    // For non backgrounded zero-downtime alters, store the
284                    // cluster_id in the ConnMeta to allow for cancellation.
285                    self.active_conns
286                        .get_mut(session.conn_id())
287                        .expect("There must be an active connection")
288                        .pending_cluster_alters
289                        .insert(cluster_id.clone());
290                    let new_config_managed = new_config_managed.clone();
291                    return match &strategy {
292                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
293                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
294                                .into(),
295                        )),
296                        AlterClusterPlanStrategy::For(duration) => {
297                            let span = Span::current();
298                            let plan = plan.clone();
299                            let duration = duration.clone().to_owned();
300                            Ok(StageResult::Handle(mz_ore::task::spawn(
301                                || "Finalize Alter Cluster",
302                                async move {
303                                    tokio::time::sleep(duration).await;
304                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
305                                        validity,
306                                        plan,
307                                        new_config: new_config_managed,
308                                    });
309                                    Ok(Box::new(stage))
310                                }
311                                .instrument(span),
312                            )))
313                        }
314                        AlterClusterPlanStrategy::UntilReady {
315                            timeout,
316                            on_timeout,
317                        } => Ok(StageResult::Immediate(Box::new(
318                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
319                                validity,
320                                plan: plan.clone(),
321                                new_config: new_config_managed.clone(),
322                                timeout_time: Instant::now() + timeout.to_owned(),
323                                on_timeout: on_timeout.to_owned(),
324                            }),
325                        ))),
326                    };
327                }
328            }
329            (Unmanaged, Managed(_)) => {
330                self.sequence_alter_cluster_unmanaged_to_managed(
331                    session,
332                    cluster_id,
333                    new_config,
334                    options.to_owned(),
335                )
336                .await?;
337            }
338            (Managed(_), Unmanaged) => {
339                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
340                    .await?;
341            }
342            (Unmanaged, Unmanaged) => {
343                self.sequence_alter_cluster_unmanaged_to_unmanaged(
344                    session,
345                    cluster_id,
346                    new_config,
347                    options.replicas.clone(),
348                )
349                .await?;
350            }
351        }
352
353        self.controller
354            .update_cluster_workload_class(cluster_id, new_workload_class)?;
355
356        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
357            ObjectType::Cluster,
358        )))
359    }
360
361    async fn finalize_alter_cluster_stage(
362        &mut self,
363        session: &Session,
364        AlterClusterPlan {
365            id: cluster_id,
366            name: cluster_name,
367            ..
368        }: AlterClusterPlan,
369        new_config: ClusterVariantManaged,
370    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
371        let cluster = self.catalog.get_cluster(cluster_id);
372        let workload_class = cluster.config.workload_class.clone();
373        let mut ops = vec![];
374
375        // Gather the ops to remove the non pending replicas
376        // Also skip any billed_as free replicas
377        let remove_replicas = cluster
378            .replicas()
379            .filter_map(|r| {
380                if !r.config.location.pending() && !r.config.location.internal() {
381                    Some(catalog::DropObjectInfo::ClusterReplica((
382                        cluster_id.clone(),
383                        r.replica_id,
384                        ReplicaCreateDropReason::Manual,
385                    )))
386                } else {
387                    None
388                }
389            })
390            .collect();
391        ops.push(catalog::Op::DropObjects(remove_replicas));
392
393        // Gather the Ops to remove the "-pending" suffix from the name and set
394        // pending to false
395        let finalize_replicas: Vec<catalog::Op> = cluster
396            .replicas()
397            .filter_map(|r| {
398                if r.config.location.pending() {
399                    let cluster_ident = match Ident::new(cluster.name.clone()) {
400                        Ok(id) => id,
401                        Err(err) => {
402                            return Some(Err(AdapterError::internal(
403                                "Unexpected error parsing cluster name",
404                                err,
405                            )));
406                        }
407                    };
408                    let replica_ident = match Ident::new(r.name.clone()) {
409                        Ok(id) => id,
410                        Err(err) => {
411                            return Some(Err(AdapterError::internal(
412                                "Unexpected error parsing replica name",
413                                err,
414                            )));
415                        }
416                    };
417                    Some(Ok((cluster_ident, replica_ident, r)))
418                } else {
419                    None
420                }
421            })
422            // Early collection is to handle errors from generating of the
423            // Idents
424            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
425            .into_iter()
426            .map(|(cluster_ident, replica_ident, replica)| {
427                let mut new_replica_config = replica.config.clone();
428                debug!("Promoting replica: {}", replica.name);
429                match new_replica_config.location {
430                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
431                        ref mut pending,
432                        ..
433                    }) => {
434                        *pending = false;
435                    }
436                    _ => {}
437                }
438
439                let mut replica_ops = vec![];
440                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
441                if let Some(to_name) = to_name {
442                    replica_ops.push(catalog::Op::RenameClusterReplica {
443                        cluster_id: cluster_id.clone(),
444                        replica_id: replica.replica_id.to_owned(),
445                        name: QualifiedReplica {
446                            cluster: cluster_ident,
447                            replica: replica_ident,
448                        },
449                        to_name: to_name.to_owned(),
450                    });
451                }
452                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
453                    cluster_id,
454                    replica_id: replica.replica_id.to_owned(),
455                    config: new_replica_config,
456                });
457                replica_ops
458            })
459            .flatten()
460            .collect();
461
462        ops.extend(finalize_replicas);
463
464        // Add the Op to update the cluster state
465        ops.push(Op::UpdateClusterConfig {
466            id: cluster_id,
467            name: cluster_name,
468            config: ClusterConfig {
469                variant: ClusterVariant::Managed(new_config),
470                workload_class: workload_class.clone(),
471            },
472        });
473        self.catalog_transact(Some(session), ops).await?;
474        // Remove the cluster being altered from the ConnMeta
475        // pending_cluster_alters BTreeSet
476        self.active_conns
477            .get_mut(session.conn_id())
478            .expect("There must be an active connection")
479            .pending_cluster_alters
480            .remove(&cluster_id);
481
482        self.controller
483            .update_cluster_workload_class(cluster_id, workload_class)?;
484
485        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
486            ObjectType::Cluster,
487        )))
488    }
489
490    async fn check_if_pending_replicas_hydrated_stage(
491        &mut self,
492        session: &Session,
493        plan: AlterClusterPlan,
494        new_config: ClusterVariantManaged,
495        timeout_time: Instant,
496        on_timeout: OnTimeoutAction,
497        validity: PlanValidity,
498    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
499        // wait and re-signal wait for hydrated if not hydrated
500        let cluster = self.catalog.get_cluster(plan.id);
501        let pending_replicas = cluster
502            .replicas()
503            .filter_map(|r| {
504                if r.config.location.pending() {
505                    Some(r.replica_id.clone())
506                } else {
507                    None
508                }
509            })
510            .collect_vec();
511        // Check For timeout
512        if Instant::now() > timeout_time {
513            // Timed out handle timeout action
514            match on_timeout {
515                OnTimeoutAction::Rollback => {
516                    self.active_conns
517                        .get_mut(session.conn_id())
518                        .expect("There must be an active connection")
519                        .pending_cluster_alters
520                        .remove(&cluster.id);
521                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
522                        .await?;
523                    return Err(AdapterError::AlterClusterTimeout);
524                }
525                OnTimeoutAction::Commit => {
526                    let span = Span::current();
527                    let poll_duration = self
528                        .catalog
529                        .system_config()
530                        .cluster_alter_check_ready_interval()
531                        .clone();
532                    return Ok(StageResult::Handle(mz_ore::task::spawn(
533                        || "Finalize Alter Cluster",
534                        async move {
535                            tokio::time::sleep(poll_duration).await;
536                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
537                                validity,
538                                plan,
539                                new_config,
540                            });
541                            Ok(Box::new(stage))
542                        }
543                        .instrument(span),
544                    )));
545                }
546            }
547        }
548        let compute_hydrated_fut = self
549            .controller
550            .compute
551            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
552            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
553
554        let storage_hydrated = self
555            .controller
556            .storage
557            .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
558            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560        let span = Span::current();
561        Ok(StageResult::Handle(mz_ore::task::spawn(
562            || "Alter Cluster: wait for hydrated",
563            async move {
564                let compute_hydrated = compute_hydrated_fut
565                    .await
566                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
567
568                if compute_hydrated && storage_hydrated {
569                    // We're done
570                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
571                        validity,
572                        plan,
573                        new_config,
574                    })))
575                } else {
576                    // Check later
577                    tokio::time::sleep(Duration::from_secs(1)).await;
578                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
579                        validity,
580                        plan,
581                        new_config,
582                        timeout_time,
583                        on_timeout,
584                    });
585                    Ok(Box::new(stage))
586                }
587            }
588            .instrument(span),
589        )))
590    }
591
592    #[mz_ore::instrument(level = "debug")]
593    pub(crate) async fn sequence_create_cluster(
594        &mut self,
595        session: &Session,
596        CreateClusterPlan {
597            name,
598            variant,
599            workload_class,
600        }: CreateClusterPlan,
601    ) -> Result<ExecuteResponse, AdapterError> {
602        tracing::debug!("sequence_create_cluster");
603
604        let id_ts = self.get_catalog_write_ts().await;
605        let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
606        // The catalog items for the introspection sources are shared between all replicas
607        // of a compute instance, so we create them unconditionally during instance creation.
608        // Whether a replica actually maintains introspection arrangements is determined by the
609        // per-replica introspection configuration.
610        let introspection_sources = BUILTINS::logs().collect();
611        let cluster_variant = match &variant {
612            CreateClusterVariant::Managed(plan) => {
613                let logging = if let Some(config) = plan.compute.introspection {
614                    ReplicaLogging {
615                        log_logging: config.debugging,
616                        interval: Some(config.interval),
617                    }
618                } else {
619                    ReplicaLogging::default()
620                };
621                ClusterVariant::Managed(ClusterVariantManaged {
622                    size: plan.size.clone(),
623                    availability_zones: plan.availability_zones.clone(),
624                    logging,
625                    replication_factor: plan.replication_factor,
626                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
627                    schedule: plan.schedule.clone(),
628                })
629            }
630            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
631        };
632        let config = ClusterConfig {
633            variant: cluster_variant,
634            workload_class,
635        };
636        let ops = vec![catalog::Op::CreateCluster {
637            id,
638            name: name.clone(),
639            introspection_sources,
640            owner_id: *session.current_role_id(),
641            config,
642        }];
643
644        match variant {
645            CreateClusterVariant::Managed(plan) => {
646                self.sequence_create_managed_cluster(session, plan, id, ops)
647                    .await
648            }
649            CreateClusterVariant::Unmanaged(plan) => {
650                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
651                    .await
652            }
653        }
654    }
655
656    #[mz_ore::instrument(level = "debug")]
657    async fn sequence_create_managed_cluster(
658        &mut self,
659        session: &Session,
660        CreateClusterManagedPlan {
661            availability_zones,
662            compute,
663            replication_factor,
664            size,
665            optimizer_feature_overrides: _,
666            schedule: _,
667        }: CreateClusterManagedPlan,
668        cluster_id: ClusterId,
669        mut ops: Vec<catalog::Op>,
670    ) -> Result<ExecuteResponse, AdapterError> {
671        tracing::debug!("sequence_create_managed_cluster");
672
673        self.ensure_valid_azs(availability_zones.iter())?;
674
675        let role_id = session.role_metadata().current_role;
676        self.catalog.ensure_valid_replica_size(
677            &self
678                .catalog()
679                .get_role_allowed_cluster_sizes(&Some(role_id)),
680            &size,
681        )?;
682
683        // Eagerly validate the `max_replicas_per_cluster` limit.
684        // `catalog_transact` will do this validation too, but allocating
685        // replica IDs is expensive enough that we need to do this validation
686        // before allocating replica IDs. See database-issues#6046.
687        if cluster_id.is_user() {
688            self.validate_resource_limit(
689                0,
690                i64::from(replication_factor),
691                SystemVars::max_replicas_per_cluster,
692                "cluster replica",
693                MAX_REPLICAS_PER_CLUSTER.name(),
694            )?;
695        }
696
697        for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
698            self.create_managed_cluster_replica_op(
699                cluster_id,
700                replica_name,
701                &compute,
702                &size,
703                &mut ops,
704                if availability_zones.is_empty() {
705                    None
706                } else {
707                    Some(availability_zones.as_ref())
708                },
709                false,
710                *session.current_role_id(),
711                ReplicaCreateDropReason::Manual,
712            )?;
713        }
714
715        self.catalog_transact(Some(session), ops).await?;
716
717        self.create_cluster(cluster_id).await;
718
719        Ok(ExecuteResponse::CreatedCluster)
720    }
721
722    fn create_managed_cluster_replica_op(
723        &self,
724        cluster_id: ClusterId,
725        name: String,
726        compute: &mz_sql::plan::ComputeReplicaConfig,
727        size: &String,
728        ops: &mut Vec<Op>,
729        azs: Option<&[String]>,
730        pending: bool,
731        owner_id: RoleId,
732        reason: ReplicaCreateDropReason,
733    ) -> Result<(), AdapterError> {
734        let location = mz_catalog::durable::ReplicaLocation::Managed {
735            availability_zone: None,
736            billed_as: None,
737            internal: false,
738            size: size.clone(),
739            pending,
740        };
741
742        let logging = if let Some(config) = compute.introspection {
743            ReplicaLogging {
744                log_logging: config.debugging,
745                interval: Some(config.interval),
746            }
747        } else {
748            ReplicaLogging::default()
749        };
750
751        let config = ReplicaConfig {
752            location: self.catalog().concretize_replica_location(
753                location,
754                &self
755                    .catalog()
756                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
757                azs,
758            )?,
759            compute: ComputeReplicaConfig { logging },
760        };
761
762        ops.push(catalog::Op::CreateClusterReplica {
763            cluster_id,
764            name,
765            config,
766            owner_id,
767            reason,
768        });
769        Ok(())
770    }
771
772    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
773        &self,
774        azs: I,
775    ) -> Result<(), AdapterError> {
776        let cat_azs = self.catalog().state().availability_zones();
777        for az in azs.into_iter() {
778            if !cat_azs.contains(az) {
779                return Err(AdapterError::InvalidClusterReplicaAz {
780                    az: az.to_string(),
781                    expected: cat_azs.to_vec(),
782                });
783            }
784        }
785        Ok(())
786    }
787
788    #[mz_ore::instrument(level = "debug")]
789    async fn sequence_create_unmanaged_cluster(
790        &mut self,
791        session: &Session,
792        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
793        id: ClusterId,
794        mut ops: Vec<catalog::Op>,
795    ) -> Result<ExecuteResponse, AdapterError> {
796        tracing::debug!("sequence_create_unmanaged_cluster");
797
798        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
799            if let mz_sql::plan::ReplicaConfig::Orchestrated {
800                availability_zone: Some(az),
801                ..
802            } = &r
803            {
804                Some(az)
805            } else {
806                None
807            }
808        }))?;
809
810        // Eagerly validate the `max_replicas_per_cluster` limit.
811        // `catalog_transact` will do this validation too, but allocating
812        // replica IDs is expensive enough that we need to do this validation
813        // before allocating replica IDs. See database-issues#6046.
814        if id.is_user() {
815            self.validate_resource_limit(
816                0,
817                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
818                SystemVars::max_replicas_per_cluster,
819                "cluster replica",
820                MAX_REPLICAS_PER_CLUSTER.name(),
821            )?;
822        }
823
824        for (replica_name, replica_config) in replicas {
825            // If the AZ was not specified, choose one, round-robin, from the ones with
826            // the lowest number of configured replicas for this cluster.
827            let (compute, location) = match replica_config {
828                mz_sql::plan::ReplicaConfig::Unorchestrated {
829                    storagectl_addrs,
830                    computectl_addrs,
831                    compute,
832                } => {
833                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
834                        storagectl_addrs,
835                        computectl_addrs,
836                    };
837                    (compute, location)
838                }
839                mz_sql::plan::ReplicaConfig::Orchestrated {
840                    availability_zone,
841                    billed_as,
842                    compute,
843                    internal,
844                    size,
845                } => {
846                    // Only internal users have access to INTERNAL and BILLED AS
847                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
848                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
849                    }
850                    // BILLED AS implies the INTERNAL flag.
851                    if billed_as.is_some() && !internal {
852                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
853                    }
854
855                    let location = mz_catalog::durable::ReplicaLocation::Managed {
856                        availability_zone,
857                        billed_as,
858                        internal,
859                        size: size.clone(),
860                        pending: false,
861                    };
862                    (compute, location)
863                }
864            };
865
866            let logging = if let Some(config) = compute.introspection {
867                ReplicaLogging {
868                    log_logging: config.debugging,
869                    interval: Some(config.interval),
870                }
871            } else {
872                ReplicaLogging::default()
873            };
874
875            let role_id = session.role_metadata().current_role;
876            let config = ReplicaConfig {
877                location: self.catalog().concretize_replica_location(
878                    location,
879                    &self
880                        .catalog()
881                        .get_role_allowed_cluster_sizes(&Some(role_id)),
882                    None,
883                )?,
884                compute: ComputeReplicaConfig { logging },
885            };
886
887            ops.push(catalog::Op::CreateClusterReplica {
888                cluster_id: id,
889                name: replica_name.clone(),
890                config,
891                owner_id: *session.current_role_id(),
892                reason: ReplicaCreateDropReason::Manual,
893            });
894        }
895
896        self.catalog_transact(Some(session), ops).await?;
897
898        self.create_cluster(id).await;
899
900        Ok(ExecuteResponse::CreatedCluster)
901    }
902
903    async fn create_cluster(&mut self, cluster_id: ClusterId) {
904        let Coordinator {
905            catalog,
906            controller,
907            ..
908        } = self;
909        let cluster = catalog.get_cluster(cluster_id);
910        let cluster_id = cluster.id;
911        let introspection_source_ids: Vec<_> =
912            cluster.log_indexes.iter().map(|(_, id)| *id).collect();
913
914        controller
915            .create_cluster(
916                cluster_id,
917                mz_controller::clusters::ClusterConfig {
918                    arranged_logs: cluster.log_indexes.clone(),
919                    workload_class: cluster.config.workload_class.clone(),
920                },
921            )
922            .expect("creating cluster must not fail");
923
924        let replica_ids: Vec<_> = cluster
925            .replicas()
926            .map(|r| (r.replica_id, format!("{}.{}", cluster.name(), &r.name)))
927            .collect();
928        for (replica_id, replica_name) in replica_ids {
929            self.create_cluster_replica(cluster_id, replica_id, replica_name)
930                .await;
931        }
932
933        if !introspection_source_ids.is_empty() {
934            self.initialize_compute_read_policies(
935                introspection_source_ids,
936                cluster_id,
937                CompactionWindow::Default,
938            )
939            .await;
940        }
941    }
942
943    #[mz_ore::instrument(level = "debug")]
944    pub(crate) async fn sequence_create_cluster_replica(
945        &mut self,
946        session: &Session,
947        CreateClusterReplicaPlan {
948            name,
949            cluster_id,
950            config,
951        }: CreateClusterReplicaPlan,
952    ) -> Result<ExecuteResponse, AdapterError> {
953        // Choose default AZ if necessary
954        let (compute, location) = match config {
955            mz_sql::plan::ReplicaConfig::Unorchestrated {
956                storagectl_addrs,
957                computectl_addrs,
958                compute,
959            } => {
960                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
961                    storagectl_addrs,
962                    computectl_addrs,
963                };
964                (compute, location)
965            }
966            mz_sql::plan::ReplicaConfig::Orchestrated {
967                availability_zone,
968                billed_as,
969                compute,
970                internal,
971                size,
972            } => {
973                let availability_zone = match availability_zone {
974                    Some(az) => {
975                        self.ensure_valid_azs([&az])?;
976                        Some(az)
977                    }
978                    None => None,
979                };
980                let location = mz_catalog::durable::ReplicaLocation::Managed {
981                    availability_zone,
982                    billed_as,
983                    internal,
984                    size,
985                    pending: false,
986                };
987                (compute, location)
988            }
989        };
990
991        let logging = if let Some(config) = compute.introspection {
992            ReplicaLogging {
993                log_logging: config.debugging,
994                interval: Some(config.interval),
995            }
996        } else {
997            ReplicaLogging::default()
998        };
999
1000        let role_id = session.role_metadata().current_role;
1001        let config = ReplicaConfig {
1002            location: self.catalog().concretize_replica_location(
1003                location,
1004                &self
1005                    .catalog()
1006                    .get_role_allowed_cluster_sizes(&Some(role_id)),
1007                // Planning ensures all replicas in this codepath
1008                // are unmanaged.
1009                None,
1010            )?,
1011            compute: ComputeReplicaConfig { logging },
1012        };
1013
1014        let cluster = self.catalog().get_cluster(cluster_id);
1015
1016        if let ReplicaLocation::Managed(ManagedReplicaLocation {
1017            internal,
1018            billed_as,
1019            ..
1020        }) = &config.location
1021        {
1022            // Only internal users have access to INTERNAL and BILLED AS
1023            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1024                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1025            }
1026            // Managed clusters require the INTERNAL flag.
1027            if cluster.is_managed() && !*internal {
1028                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1029            }
1030            // BILLED AS implies the INTERNAL flag.
1031            if billed_as.is_some() && !*internal {
1032                coord_bail!("must specify INTERNAL when specifying BILLED AS");
1033            }
1034        }
1035
1036        // Replicas have the same owner as their cluster.
1037        let owner_id = cluster.owner_id();
1038        let op = catalog::Op::CreateClusterReplica {
1039            cluster_id,
1040            name: name.clone(),
1041            config,
1042            owner_id,
1043            reason: ReplicaCreateDropReason::Manual,
1044        };
1045
1046        self.catalog_transact(Some(session), vec![op]).await?;
1047
1048        let id = self
1049            .catalog()
1050            .resolve_replica_in_cluster(&cluster_id, &name)
1051            .expect("just created")
1052            .replica_id();
1053
1054        self.create_cluster_replica(cluster_id, id, name).await;
1055
1056        Ok(ExecuteResponse::CreatedClusterReplica)
1057    }
1058
1059    async fn create_cluster_replica(
1060        &mut self,
1061        cluster_id: ClusterId,
1062        replica_id: ReplicaId,
1063        replica_name: String,
1064    ) {
1065        let cluster = self.catalog().get_cluster(cluster_id);
1066        let role = cluster.role();
1067        let replica_config = cluster
1068            .replica(replica_id)
1069            .expect("known to exist")
1070            .config
1071            .clone();
1072
1073        let enable_worker_core_affinity =
1074            self.catalog().system_config().enable_worker_core_affinity();
1075
1076        self.controller
1077            .create_replica(
1078                cluster_id,
1079                replica_id,
1080                cluster.name.to_owned(),
1081                replica_name,
1082                role,
1083                replica_config,
1084                enable_worker_core_affinity,
1085            )
1086            .expect("creating replicas must not fail");
1087
1088        self.install_introspection_subscribes(cluster_id, replica_id)
1089            .await;
1090    }
1091
1092    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1093    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1094    /// `details` field of the audit log event that records creating or dropping replicas.
1095    ///
1096    /// # Panics
1097    ///
1098    /// Panics if the identified cluster is not a managed cluster.
1099    /// Panics if `new_config` is not a configuration for a managed cluster.
1100    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1101        &mut self,
1102        session: Option<&Session>,
1103        cluster_id: ClusterId,
1104        new_config: ClusterConfig,
1105        reason: ReplicaCreateDropReason,
1106        strategy: AlterClusterPlanStrategy,
1107    ) -> Result<NeedsFinalization, AdapterError> {
1108        let cluster = self.catalog.get_cluster(cluster_id);
1109        let name = cluster.name().to_string();
1110        let owner_id = cluster.owner_id();
1111
1112        let mut ops = vec![];
1113        let mut create_cluster_replicas = vec![];
1114        let mut finalization_needed = NeedsFinalization::No;
1115
1116        let ClusterVariant::Managed(ClusterVariantManaged {
1117            size,
1118            availability_zones,
1119            logging,
1120            replication_factor,
1121            optimizer_feature_overrides: _,
1122            schedule: _,
1123        }) = &cluster.config.variant
1124        else {
1125            panic!("expected existing managed cluster config");
1126        };
1127        let ClusterVariant::Managed(ClusterVariantManaged {
1128            size: new_size,
1129            replication_factor: new_replication_factor,
1130            availability_zones: new_availability_zones,
1131            logging: new_logging,
1132            optimizer_feature_overrides: _,
1133            schedule: _,
1134        }) = &new_config.variant
1135        else {
1136            panic!("expected new managed cluster config");
1137        };
1138
1139        let role_id = session.map(|s| s.role_metadata().current_role);
1140        self.catalog.ensure_valid_replica_size(
1141            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1142            new_size,
1143        )?;
1144
1145        // check for active updates
1146        if cluster.replicas().any(|r| r.config.location.pending()) {
1147            return Err(AlterClusterWhilePendingReplicas);
1148        }
1149
1150        let compute = mz_sql::plan::ComputeReplicaConfig {
1151            introspection: new_logging
1152                .interval
1153                .map(|interval| ComputeReplicaIntrospectionConfig {
1154                    debugging: new_logging.log_logging,
1155                    interval,
1156                }),
1157        };
1158
1159        // Eagerly validate the `max_replicas_per_cluster` limit.
1160        // `catalog_transact` will do this validation too, but allocating
1161        // replica IDs is expensive enough that we need to do this validation
1162        // before allocating replica IDs. See database-issues#6046.
1163        if new_replication_factor > replication_factor {
1164            if cluster_id.is_user() {
1165                self.validate_resource_limit(
1166                    usize::cast_from(*replication_factor),
1167                    i64::from(*new_replication_factor) - i64::from(*replication_factor),
1168                    SystemVars::max_replicas_per_cluster,
1169                    "cluster replica",
1170                    MAX_REPLICAS_PER_CLUSTER.name(),
1171                )?;
1172            }
1173        }
1174
1175        if new_size != size
1176            || new_availability_zones != availability_zones
1177            || new_logging != logging
1178        {
1179            self.ensure_valid_azs(new_availability_zones.iter())?;
1180            // If we're not doing a zero-downtime reconfig tear down all
1181            // replicas, create new ones else create the pending replicas and
1182            // return early asking for finalization
1183            match strategy {
1184                AlterClusterPlanStrategy::None => {
1185                    let replica_ids_and_reasons = (0..*replication_factor)
1186                        .map(managed_cluster_replica_name)
1187                        .filter_map(|name| cluster.replica_id(&name))
1188                        .map(|replica_id| {
1189                            catalog::DropObjectInfo::ClusterReplica((
1190                                cluster.id(),
1191                                replica_id,
1192                                reason.clone(),
1193                            ))
1194                        })
1195                        .collect();
1196                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1197                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1198                        self.create_managed_cluster_replica_op(
1199                            cluster_id,
1200                            name.clone(),
1201                            &compute,
1202                            new_size,
1203                            &mut ops,
1204                            Some(new_availability_zones.as_ref()),
1205                            false,
1206                            owner_id,
1207                            reason.clone(),
1208                        )?;
1209                        create_cluster_replicas.push((cluster_id, name));
1210                    }
1211                }
1212                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1213                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1214                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1215                        self.create_managed_cluster_replica_op(
1216                            cluster_id,
1217                            name.clone(),
1218                            &compute,
1219                            new_size,
1220                            &mut ops,
1221                            Some(new_availability_zones.as_ref()),
1222                            true,
1223                            owner_id,
1224                            reason.clone(),
1225                        )?;
1226                        create_cluster_replicas.push((cluster_id, name));
1227                    }
1228                    finalization_needed = NeedsFinalization::Yes;
1229                }
1230            }
1231        } else if new_replication_factor < replication_factor {
1232            // Adjust replica count down
1233            let replica_ids = (*new_replication_factor..*replication_factor)
1234                .map(managed_cluster_replica_name)
1235                .filter_map(|name| cluster.replica_id(&name))
1236                .map(|replica_id| {
1237                    catalog::DropObjectInfo::ClusterReplica((
1238                        cluster.id(),
1239                        replica_id,
1240                        reason.clone(),
1241                    ))
1242                })
1243                .collect();
1244            ops.push(catalog::Op::DropObjects(replica_ids));
1245        } else if new_replication_factor > replication_factor {
1246            // Adjust replica count up
1247            for name in
1248                (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1249            {
1250                self.create_managed_cluster_replica_op(
1251                    cluster_id,
1252                    name.clone(),
1253                    &compute,
1254                    new_size,
1255                    &mut ops,
1256                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1257                    // rescheduled.
1258                    Some(new_availability_zones.as_ref()),
1259                    false,
1260                    owner_id,
1261                    reason.clone(),
1262                )?;
1263                create_cluster_replicas.push((cluster_id, name))
1264            }
1265        }
1266
1267        // If finalization is needed, finalization should update the cluster
1268        // config.
1269        match finalization_needed {
1270            NeedsFinalization::No => {
1271                ops.push(catalog::Op::UpdateClusterConfig {
1272                    id: cluster_id,
1273                    name: name.clone(),
1274                    config: new_config,
1275                });
1276            }
1277            _ => {}
1278        }
1279        self.catalog_transact(session, ops.clone()).await?;
1280        for (cluster_id, replica_name) in create_cluster_replicas {
1281            let replica_id = self
1282                .catalog()
1283                .resolve_replica_in_cluster(&cluster_id, &replica_name)
1284                .expect("just created")
1285                .replica_id();
1286            self.create_cluster_replica(cluster_id, replica_id, replica_name)
1287                .await;
1288        }
1289        Ok(finalization_needed)
1290    }
1291
1292    /// # Panics
1293    ///
1294    /// Panics if `new_config` is not a configuration for a managed cluster.
1295    async fn sequence_alter_cluster_unmanaged_to_managed(
1296        &mut self,
1297        session: &Session,
1298        cluster_id: ClusterId,
1299        mut new_config: ClusterConfig,
1300        options: PlanClusterOption,
1301    ) -> Result<(), AdapterError> {
1302        let cluster = self.catalog.get_cluster(cluster_id);
1303        let cluster_name = cluster.name().to_string();
1304
1305        let ClusterVariant::Managed(ClusterVariantManaged {
1306            size: new_size,
1307            replication_factor: new_replication_factor,
1308            availability_zones: new_availability_zones,
1309            logging: _,
1310            optimizer_feature_overrides: _,
1311            schedule: _,
1312        }) = &mut new_config.variant
1313        else {
1314            panic!("expected new managed cluster config");
1315        };
1316
1317        // Validate replication factor parameter
1318        let user_replica_count = cluster
1319            .user_replicas()
1320            .count()
1321            .try_into()
1322            .expect("must_fit");
1323        match options.replication_factor {
1324            AlterOptionParameter::Set(_) => {
1325                // Validate that the replication factor matches the current length only if specified.
1326                if user_replica_count != *new_replication_factor {
1327                    coord_bail!(
1328                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1329                    );
1330                }
1331            }
1332            _ => {
1333                *new_replication_factor = user_replica_count;
1334            }
1335        }
1336
1337        let mut names = BTreeSet::new();
1338        let mut sizes = BTreeSet::new();
1339
1340        self.ensure_valid_azs(new_availability_zones.iter())?;
1341
1342        // Validate per-replica configuration
1343        for replica in cluster.user_replicas() {
1344            names.insert(replica.name.clone());
1345            match &replica.config.location {
1346                ReplicaLocation::Unmanaged(_) => coord_bail!(
1347                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1348                ),
1349                ReplicaLocation::Managed(location) => {
1350                    sizes.insert(location.size.clone());
1351
1352                    if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1353                        &location.availability_zones
1354                    {
1355                        if !new_availability_zones.contains(az) {
1356                            coord_bail!(
1357                                "unmanaged replica has availability zone {az} which is not \
1358                                in managed {new_availability_zones:?}"
1359                            )
1360                        }
1361                    }
1362                }
1363            }
1364        }
1365
1366        if sizes.is_empty() {
1367            assert!(
1368                cluster.user_replicas().next().is_none(),
1369                "Cluster should not have replicas"
1370            );
1371            // We didn't collect any size, so the user has to name it.
1372            match &options.size {
1373                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1374                    coord_bail!("Missing SIZE for empty cluster")
1375                }
1376                _ => {} // Was set within the calling function.
1377            }
1378        } else if sizes.len() == 1 {
1379            let size = sizes.into_iter().next().expect("must exist");
1380            match &options.size {
1381                AlterOptionParameter::Set(sz) if *sz != size => {
1382                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1383                }
1384                _ => *new_size = size,
1385            }
1386        } else {
1387            let formatted = sizes
1388                .iter()
1389                .map(String::as_str)
1390                .collect::<Vec<_>>()
1391                .join(", ");
1392            coord_bail!(
1393                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1394            );
1395        }
1396
1397        for i in 0..*new_replication_factor {
1398            let name = managed_cluster_replica_name(i);
1399            names.remove(&name);
1400        }
1401        if !names.is_empty() {
1402            let formatted = names
1403                .iter()
1404                .map(String::as_str)
1405                .collect::<Vec<_>>()
1406                .join(", ");
1407            coord_bail!(
1408                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1409            );
1410        }
1411
1412        let ops = vec![catalog::Op::UpdateClusterConfig {
1413            id: cluster_id,
1414            name: cluster_name,
1415            config: new_config,
1416        }];
1417
1418        self.catalog_transact(Some(session), ops).await?;
1419        Ok(())
1420    }
1421
1422    async fn sequence_alter_cluster_managed_to_unmanaged(
1423        &mut self,
1424        session: &Session,
1425        cluster_id: ClusterId,
1426        new_config: ClusterConfig,
1427    ) -> Result<(), AdapterError> {
1428        let cluster = self.catalog().get_cluster(cluster_id);
1429
1430        let ops = vec![catalog::Op::UpdateClusterConfig {
1431            id: cluster_id,
1432            name: cluster.name().to_string(),
1433            config: new_config,
1434        }];
1435
1436        self.catalog_transact(Some(session), ops).await?;
1437        Ok(())
1438    }
1439
1440    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1441        &mut self,
1442        session: &Session,
1443        cluster_id: ClusterId,
1444        new_config: ClusterConfig,
1445        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1446    ) -> Result<(), AdapterError> {
1447        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1448            coord_bail!("Cannot alter replicas in unmanaged cluster");
1449        }
1450
1451        let cluster = self.catalog().get_cluster(cluster_id);
1452
1453        let ops = vec![catalog::Op::UpdateClusterConfig {
1454            id: cluster_id,
1455            name: cluster.name().to_string(),
1456            config: new_config,
1457        }];
1458
1459        self.catalog_transact(Some(session), ops).await?;
1460        Ok(())
1461    }
1462
1463    pub(crate) async fn sequence_alter_cluster_rename(
1464        &mut self,
1465        ctx: &mut ExecuteContext,
1466        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1467    ) -> Result<ExecuteResponse, AdapterError> {
1468        let op = Op::RenameCluster {
1469            id,
1470            name,
1471            to_name,
1472            check_reserved_names: true,
1473        };
1474        match self
1475            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1476            .await
1477        {
1478            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1479            Err(err) => Err(err),
1480        }
1481    }
1482
1483    pub(crate) async fn sequence_alter_cluster_swap(
1484        &mut self,
1485        ctx: &mut ExecuteContext,
1486        AlterClusterSwapPlan {
1487            id_a,
1488            id_b,
1489            name_a,
1490            name_b,
1491            name_temp,
1492        }: AlterClusterSwapPlan,
1493    ) -> Result<ExecuteResponse, AdapterError> {
1494        let op_a = Op::RenameCluster {
1495            id: id_a,
1496            name: name_a.clone(),
1497            to_name: name_temp.clone(),
1498            check_reserved_names: false,
1499        };
1500        let op_b = Op::RenameCluster {
1501            id: id_b,
1502            name: name_b.clone(),
1503            to_name: name_a,
1504            check_reserved_names: false,
1505        };
1506        let op_temp = Op::RenameCluster {
1507            id: id_a,
1508            name: name_temp,
1509            to_name: name_b,
1510            check_reserved_names: false,
1511        };
1512
1513        match self
1514            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1515                Box::pin(async {})
1516            })
1517            .await
1518        {
1519            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1520            Err(err) => Err(err),
1521        }
1522    }
1523
1524    pub(crate) async fn sequence_alter_cluster_replica_rename(
1525        &mut self,
1526        session: &Session,
1527        AlterClusterReplicaRenamePlan {
1528            cluster_id,
1529            replica_id,
1530            name,
1531            to_name,
1532        }: AlterClusterReplicaRenamePlan,
1533    ) -> Result<ExecuteResponse, AdapterError> {
1534        let op = catalog::Op::RenameClusterReplica {
1535            cluster_id,
1536            replica_id,
1537            name,
1538            to_name,
1539        };
1540        match self.catalog_transact(Some(session), vec![op]).await {
1541            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1542            Err(err) => Err(err),
1543        }
1544    }
1545
1546    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1547    pub(crate) async fn sequence_alter_set_cluster(
1548        &self,
1549        _session: &Session,
1550        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1551    ) -> Result<ExecuteResponse, AdapterError> {
1552        // TODO: This function needs to be implemented.
1553
1554        // Satisfy Clippy that this is an async func.
1555        async {}.await;
1556        let entry = self.catalog().get_entry(&id);
1557        match entry.item().typ() {
1558            _ => {
1559                // Unexpected; planner permitted unsupported plan.
1560                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1561            }
1562        }
1563    }
1564}
1565
1566fn managed_cluster_replica_name(index: u32) -> String {
1567    format!("r{}", index + 1)
1568}
1569
1570/// The type of finalization needed after an
1571/// operation such as alter_cluster_managed_to_managed.
1572#[derive(PartialEq)]
1573pub(crate) enum NeedsFinalization {
1574    /// Wait for the provided duration before finalizing
1575    Yes,
1576    No,
1577}