Skip to main content

mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21    ManagedReplicaLocation, ReplicaConfig, ReplicaLocation, ReplicaLogging,
22};
23use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
24use mz_ore::cast::CastFrom;
25use mz_ore::collections::CollectionExt;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46    Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53    type Ctx = ExecuteContext;
54
55    fn validity(&mut self) -> &mut PlanValidity {
56        match self {
57            Self::Alter(stage) => &mut stage.validity,
58            Self::WaitForHydrated(stage) => &mut stage.validity,
59            Self::Finalize(stage) => &mut stage.validity,
60        }
61    }
62
63    async fn stage(
64        self,
65        coord: &mut Coordinator,
66        ctx: &mut ExecuteContext,
67    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68        match self {
69            Self::Alter(stage) => {
70                coord
71                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72                    .await
73            }
74            Self::WaitForHydrated(stage) => {
75                let AlterClusterWaitForHydrated {
76                    validity,
77                    plan,
78                    new_config,
79                    workload_class,
80                    timeout_time,
81                    on_timeout,
82                } = stage;
83                coord
84                    .check_if_pending_replicas_hydrated_stage(
85                        ctx.session(),
86                        plan,
87                        new_config,
88                        workload_class,
89                        timeout_time,
90                        on_timeout,
91                        validity,
92                    )
93                    .await
94            }
95            Self::Finalize(stage) => {
96                coord
97                    .finalize_alter_cluster_stage(
98                        ctx.session(),
99                        stage.plan.clone(),
100                        stage.new_config.clone(),
101                        stage.workload_class.clone(),
102                    )
103                    .await
104            }
105        }
106    }
107
108    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
109        Message::ClusterStageReady {
110            ctx,
111            span,
112            stage: self,
113        }
114    }
115
116    fn cancel_enabled(&self) -> bool {
117        true
118    }
119}
120
121impl Coordinator {
122    #[instrument]
123    pub(crate) async fn sequence_alter_cluster_staged(
124        &mut self,
125        ctx: ExecuteContext,
126        plan: plan::AlterClusterPlan,
127    ) {
128        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
129        self.sequence_staged(ctx, Span::current(), stage).await;
130    }
131
132    #[instrument]
133    async fn alter_cluster_validate(
134        &self,
135        session: &Session,
136        plan: plan::AlterClusterPlan,
137    ) -> Result<ClusterStage, AdapterError> {
138        let validity = PlanValidity::new(
139            self.catalog().transient_revision(),
140            BTreeSet::new(),
141            Some(plan.id.clone()),
142            None,
143            session.role_metadata().clone(),
144        );
145        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
146    }
147
148    async fn sequence_alter_cluster_stage(
149        &mut self,
150        session: &Session,
151        plan: plan::AlterClusterPlan,
152        validity: PlanValidity,
153    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
154        let AlterClusterPlan {
155            id: cluster_id,
156            name: _,
157            ref options,
158            ref strategy,
159        } = plan;
160
161        use mz_catalog::memory::objects::ClusterVariant::*;
162        use mz_sql::plan::AlterOptionParameter::*;
163        let cluster = self.catalog.get_cluster(cluster_id);
164        let config = cluster.config.clone();
165        let mut new_config = config.clone();
166
167        match (&new_config.variant, &options.managed) {
168            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
169            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
170            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
171            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
172                // Generate a minimal correct configuration
173
174                // Size adjusted later when sequencing the actual configuration change.
175                let size = "".to_string();
176                let logging = ReplicaLogging {
177                    log_logging: false,
178                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
179                };
180                new_config.variant = Managed(ClusterVariantManaged {
181                    size,
182                    availability_zones: Default::default(),
183                    logging,
184                    replication_factor: 1,
185                    optimizer_feature_overrides: Default::default(),
186                    schedule: Default::default(),
187                    auto_scaling_strategy: None,
188                    reconfiguration: None,
189                    burst: None,
190                });
191            }
192        }
193
194        match &mut new_config.variant {
195            Managed(ClusterVariantManaged {
196                size,
197                availability_zones,
198                logging,
199                replication_factor,
200                optimizer_feature_overrides: _,
201                schedule,
202                auto_scaling_strategy: _,
203                reconfiguration: _,
204                burst: _,
205            }) => {
206                match &options.size {
207                    Set(s) => size.clone_from(s),
208                    Reset => coord_bail!("SIZE has no default value"),
209                    Unchanged => {}
210                }
211                match &options.availability_zones {
212                    Set(az) => availability_zones.clone_from(az),
213                    Reset => *availability_zones = Default::default(),
214                    Unchanged => {}
215                }
216                match &options.introspection_debugging {
217                    Set(id) => logging.log_logging = *id,
218                    Reset => logging.log_logging = false,
219                    Unchanged => {}
220                }
221                match &options.introspection_interval {
222                    Set(ii) => logging.interval = ii.0,
223                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
224                    Unchanged => {}
225                }
226                match &options.replication_factor {
227                    Set(rf) => *replication_factor = *rf,
228                    Reset => {
229                        *replication_factor = self
230                            .catalog
231                            .system_config()
232                            .default_cluster_replication_factor()
233                    }
234                    Unchanged => {}
235                }
236                match &options.schedule {
237                    Set(new_schedule) => {
238                        *schedule = new_schedule.clone();
239                    }
240                    Reset => *schedule = Default::default(),
241                    Unchanged => {}
242                }
243                if !matches!(options.replicas, Unchanged) {
244                    coord_bail!("Cannot change REPLICAS of managed clusters");
245                }
246            }
247            Unmanaged => {
248                if !matches!(options.size, Unchanged) {
249                    coord_bail!("Cannot change SIZE of unmanaged clusters");
250                }
251                if !matches!(options.availability_zones, Unchanged) {
252                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
253                }
254                if !matches!(options.introspection_debugging, Unchanged) {
255                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
256                }
257                if !matches!(options.introspection_interval, Unchanged) {
258                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
259                }
260                if !matches!(options.replication_factor, Unchanged) {
261                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
262                }
263            }
264        }
265
266        match &options.workload_class {
267            Set(wc) => new_config.workload_class.clone_from(wc),
268            Reset => new_config.workload_class = None,
269            Unchanged => {}
270        }
271
272        if new_config == config {
273            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
274                ObjectType::Cluster,
275            )));
276        }
277
278        match (&config.variant, &new_config.variant) {
279            (Managed(_), Managed(new_config_managed)) => {
280                let alter_followup = self
281                    .sequence_alter_cluster_managed_to_managed(
282                        Some(session),
283                        cluster_id,
284                        new_config.clone(),
285                        ReplicaCreateDropReason::Manual,
286                        strategy.clone(),
287                    )
288                    .await?;
289                if alter_followup == NeedsFinalization::Yes {
290                    // For non backgrounded zero-downtime alters, store the
291                    // cluster_id in the ConnMeta to allow for cancellation.
292                    self.active_conns
293                        .get_mut(session.conn_id())
294                        .expect("There must be an active connection")
295                        .pending_cluster_alters
296                        .insert(cluster_id.clone());
297                    let new_config_managed = new_config_managed.clone();
298                    return match &strategy {
299                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
300                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
301                                .into(),
302                        )),
303                        AlterClusterPlanStrategy::For(duration) => {
304                            let span = Span::current();
305                            let plan = plan.clone();
306                            let duration = duration.clone().to_owned();
307                            let workload_class = new_config.workload_class.clone();
308                            Ok(StageResult::Handle(mz_ore::task::spawn(
309                                || "Finalize Alter Cluster",
310                                async move {
311                                    tokio::time::sleep(duration).await;
312                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
313                                        validity,
314                                        plan,
315                                        new_config: new_config_managed,
316                                        workload_class,
317                                    });
318                                    Ok(Box::new(stage))
319                                }
320                                .instrument(span),
321                            )))
322                        }
323                        AlterClusterPlanStrategy::UntilReady {
324                            timeout,
325                            on_timeout,
326                        } => Ok(StageResult::Immediate(Box::new(
327                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
328                                validity,
329                                plan: plan.clone(),
330                                new_config: new_config_managed.clone(),
331                                workload_class: new_config.workload_class.clone(),
332                                timeout_time: Instant::now() + timeout.to_owned(),
333                                on_timeout: on_timeout.to_owned(),
334                            }),
335                        ))),
336                    };
337                }
338            }
339            (Unmanaged, Managed(_)) => {
340                self.sequence_alter_cluster_unmanaged_to_managed(
341                    session,
342                    cluster_id,
343                    new_config,
344                    options.to_owned(),
345                )
346                .await?;
347            }
348            (Managed(_), Unmanaged) => {
349                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
350                    .await?;
351            }
352            (Unmanaged, Unmanaged) => {
353                self.sequence_alter_cluster_unmanaged_to_unmanaged(
354                    session,
355                    cluster_id,
356                    new_config,
357                    options.replicas.clone(),
358                )
359                .await?;
360            }
361        }
362
363        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
364            ObjectType::Cluster,
365        )))
366    }
367
368    async fn finalize_alter_cluster_stage(
369        &mut self,
370        session: &Session,
371        AlterClusterPlan {
372            id: cluster_id,
373            name: cluster_name,
374            ..
375        }: AlterClusterPlan,
376        new_config: ClusterVariantManaged,
377        workload_class: Option<String>,
378    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
379        let cluster = self.catalog.get_cluster(cluster_id);
380        let mut ops = vec![];
381
382        // Gather the ops to remove the non pending replicas
383        // Also skip any billed_as free replicas
384        let remove_replicas = cluster
385            .replicas()
386            .filter_map(|r| {
387                if !r.config.location.pending() && !r.config.location.internal() {
388                    Some(catalog::DropObjectInfo::ClusterReplica((
389                        cluster_id.clone(),
390                        r.replica_id,
391                        ReplicaCreateDropReason::Manual,
392                    )))
393                } else {
394                    None
395                }
396            })
397            .collect();
398        ops.push(catalog::Op::DropObjects(remove_replicas));
399
400        // Gather the Ops to remove the "-pending" suffix from the name and set
401        // pending to false
402        let finalize_replicas: Vec<catalog::Op> = cluster
403            .replicas()
404            .filter_map(|r| {
405                if r.config.location.pending() {
406                    let cluster_ident = match Ident::new(cluster.name.clone()) {
407                        Ok(id) => id,
408                        Err(err) => {
409                            return Some(Err(AdapterError::internal(
410                                "Unexpected error parsing cluster name",
411                                err,
412                            )));
413                        }
414                    };
415                    let replica_ident = match Ident::new(r.name.clone()) {
416                        Ok(id) => id,
417                        Err(err) => {
418                            return Some(Err(AdapterError::internal(
419                                "Unexpected error parsing replica name",
420                                err,
421                            )));
422                        }
423                    };
424                    Some(Ok((cluster_ident, replica_ident, r)))
425                } else {
426                    None
427                }
428            })
429            // Early collection is to handle errors from generating of the
430            // Idents
431            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
432            .into_iter()
433            .map(|(cluster_ident, replica_ident, replica)| {
434                let mut new_replica_config = replica.config.clone();
435                debug!("Promoting replica: {}", replica.name);
436                match new_replica_config.location {
437                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
438                        ref mut pending,
439                        ..
440                    }) => {
441                        *pending = false;
442                    }
443                    mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
444                }
445
446                let mut replica_ops = vec![];
447                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
448                if let Some(to_name) = to_name {
449                    replica_ops.push(catalog::Op::RenameClusterReplica {
450                        cluster_id: cluster_id.clone(),
451                        replica_id: replica.replica_id.to_owned(),
452                        name: QualifiedReplica {
453                            cluster: cluster_ident,
454                            replica: replica_ident,
455                        },
456                        to_name: to_name.to_owned(),
457                    });
458                }
459                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
460                    cluster_id,
461                    replica_id: replica.replica_id.to_owned(),
462                    config: new_replica_config,
463                });
464                replica_ops
465            })
466            .flatten()
467            .collect();
468
469        ops.extend(finalize_replicas);
470
471        // Add the Op to update the cluster state
472        ops.push(Op::UpdateClusterConfig {
473            id: cluster_id,
474            name: cluster_name,
475            config: ClusterConfig {
476                variant: ClusterVariant::Managed(new_config),
477                workload_class: workload_class.clone(),
478            },
479        });
480        self.catalog_transact(Some(session), ops).await?;
481        // Remove the cluster being altered from the ConnMeta
482        // pending_cluster_alters BTreeSet
483        self.active_conns
484            .get_mut(session.conn_id())
485            .expect("There must be an active connection")
486            .pending_cluster_alters
487            .remove(&cluster_id);
488
489        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
490            ObjectType::Cluster,
491        )))
492    }
493
494    async fn check_if_pending_replicas_hydrated_stage(
495        &mut self,
496        session: &Session,
497        plan: AlterClusterPlan,
498        new_config: ClusterVariantManaged,
499        workload_class: Option<String>,
500        timeout_time: Instant,
501        on_timeout: OnTimeoutAction,
502        validity: PlanValidity,
503    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
504        // wait and re-signal wait for hydrated if not hydrated
505        let cluster = self.catalog.get_cluster(plan.id);
506        let pending_replicas = cluster
507            .replicas()
508            .filter_map(|r| {
509                if r.config.location.pending() {
510                    Some(r.replica_id.clone())
511                } else {
512                    None
513                }
514            })
515            .collect_vec();
516        // Check For timeout
517        if Instant::now() > timeout_time {
518            // Timed out handle timeout action
519            match on_timeout {
520                OnTimeoutAction::Rollback => {
521                    self.active_conns
522                        .get_mut(session.conn_id())
523                        .expect("There must be an active connection")
524                        .pending_cluster_alters
525                        .remove(&cluster.id);
526                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
527                        .await?;
528                    return Err(AdapterError::AlterClusterTimeout);
529                }
530                OnTimeoutAction::Commit => {
531                    let span = Span::current();
532                    let poll_duration = self
533                        .catalog
534                        .system_config()
535                        .cluster_alter_check_ready_interval()
536                        .clone();
537                    return Ok(StageResult::Handle(mz_ore::task::spawn(
538                        || "Finalize Alter Cluster",
539                        async move {
540                            tokio::time::sleep(poll_duration).await;
541                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
542                                validity,
543                                plan,
544                                new_config,
545                                workload_class,
546                            });
547                            Ok(Box::new(stage))
548                        }
549                        .instrument(span),
550                    )));
551                }
552            }
553        }
554        let compute_hydrated_fut = self
555            .controller
556            .compute
557            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
558            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560        let storage_hydrated = self
561            .controller
562            .storage
563            .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
564            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
565
566        let span = Span::current();
567        Ok(StageResult::Handle(mz_ore::task::spawn(
568            || "Alter Cluster: wait for hydrated",
569            async move {
570                let compute_hydrated = compute_hydrated_fut
571                    .await
572                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
573
574                if compute_hydrated && storage_hydrated {
575                    // We're done
576                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
577                        validity,
578                        plan,
579                        new_config: new_config.clone(),
580                        workload_class: workload_class.clone(),
581                    })))
582                } else {
583                    // Check later
584                    tokio::time::sleep(Duration::from_secs(1)).await;
585                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
586                        validity,
587                        plan,
588                        new_config,
589                        workload_class,
590                        timeout_time,
591                        on_timeout,
592                    });
593                    Ok(Box::new(stage))
594                }
595            }
596            .instrument(span),
597        )))
598    }
599
600    #[mz_ore::instrument(level = "debug")]
601    pub(crate) async fn sequence_create_cluster(
602        &mut self,
603        session: &Session,
604        CreateClusterPlan {
605            name,
606            variant,
607            workload_class,
608        }: CreateClusterPlan,
609    ) -> Result<ExecuteResponse, AdapterError> {
610        tracing::debug!("sequence_create_cluster");
611
612        let id_ts = self.get_catalog_write_ts().await;
613        let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
614        // The catalog items for the introspection sources are shared between all replicas
615        // of a compute instance, so we create them unconditionally during instance creation.
616        // Whether a replica actually maintains introspection arrangements is determined by the
617        // per-replica introspection configuration.
618        let introspection_sources = BUILTINS::logs().collect();
619        let cluster_variant = match &variant {
620            CreateClusterVariant::Managed(plan) => {
621                let logging = if let Some(config) = plan.compute.introspection {
622                    ReplicaLogging {
623                        log_logging: config.debugging,
624                        interval: Some(config.interval),
625                    }
626                } else {
627                    ReplicaLogging::default()
628                };
629                ClusterVariant::Managed(ClusterVariantManaged {
630                    size: plan.size.clone(),
631                    availability_zones: plan.availability_zones.clone(),
632                    logging,
633                    replication_factor: plan.replication_factor,
634                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
635                    schedule: plan.schedule.clone(),
636                    auto_scaling_strategy: None,
637                    reconfiguration: None,
638                    burst: None,
639                })
640            }
641            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
642        };
643        let config = ClusterConfig {
644            variant: cluster_variant,
645            workload_class,
646        };
647        let ops = vec![catalog::Op::CreateCluster {
648            id,
649            name: name.clone(),
650            introspection_sources,
651            owner_id: *session.current_role_id(),
652            config,
653        }];
654
655        match variant {
656            CreateClusterVariant::Managed(plan) => {
657                self.sequence_create_managed_cluster(session, plan, id, ops)
658                    .await
659            }
660            CreateClusterVariant::Unmanaged(plan) => {
661                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
662                    .await
663            }
664        }
665    }
666
667    #[mz_ore::instrument(level = "debug")]
668    async fn sequence_create_managed_cluster(
669        &mut self,
670        session: &Session,
671        CreateClusterManagedPlan {
672            availability_zones,
673            compute,
674            replication_factor,
675            size,
676            optimizer_feature_overrides: _,
677            schedule: _,
678        }: CreateClusterManagedPlan,
679        cluster_id: ClusterId,
680        mut ops: Vec<catalog::Op>,
681    ) -> Result<ExecuteResponse, AdapterError> {
682        tracing::debug!("sequence_create_managed_cluster");
683
684        self.ensure_valid_azs(availability_zones.iter())?;
685
686        let role_id = session.role_metadata().current_role;
687        self.catalog.ensure_valid_replica_size(
688            &self
689                .catalog()
690                .get_role_allowed_cluster_sizes(&Some(role_id)),
691            &size,
692            false,
693        )?;
694
695        // Eagerly validate the `max_replicas_per_cluster` limit.
696        // `catalog_transact` will do this validation too, but allocating
697        // replica IDs is expensive enough that we need to do this validation
698        // before allocating replica IDs. See database-issues#6046.
699        if cluster_id.is_user() {
700            self.validate_resource_limit(
701                0,
702                i64::from(replication_factor),
703                SystemVars::max_replicas_per_cluster,
704                "cluster replica",
705                MAX_REPLICAS_PER_CLUSTER.name(),
706            )?;
707        }
708
709        // Pre-allocate replica ids out-of-band via the durable allocator,
710        // picking the id type from the owning cluster. This mirrors how cluster
711        // and item ids are allocated, so nothing allocates a replica id
712        // in-apply.
713        let id_ts = self.get_catalog_write_ts().await;
714        let replica_ids = self
715            .catalog()
716            .allocate_replica_ids(cluster_id, u64::from(replication_factor), id_ts)
717            .await?;
718
719        for (replica_id, replica_name) in replica_ids
720            .into_iter()
721            .zip_eq((0..replication_factor).map(managed_cluster_replica_name))
722        {
723            self.create_managed_cluster_replica_op(
724                cluster_id,
725                replica_id,
726                replica_name,
727                &compute,
728                &size,
729                &mut ops,
730                if availability_zones.is_empty() {
731                    None
732                } else {
733                    Some(availability_zones.as_ref())
734                },
735                false,
736                *session.current_role_id(),
737                ReplicaCreateDropReason::Manual,
738            )?;
739        }
740
741        self.catalog_transact(Some(session), ops).await?;
742
743        // Resolve the new cluster's cluster-scoped overrides now, so its first
744        // plan uses them rather than the env-wide values. Optimizer features are
745        // baked into immutable dataflows, so a value applied on a later sync tick
746        // cannot retrofit an already planned dataflow.
747        //
748        // TODO: this resolves only the cluster-scoped overrides. Replicas created
749        // in this same transaction reach the controller with env-wide config and
750        // pick up their replica-local overrides on the next sync tick, which is
751        // too late for render-frozen flags. The follow-up materialize#37158 folds
752        // replica resolution into the create transaction.
753        self.resolve_scoped_for_new_objects(&BTreeSet::from([cluster_id]), &BTreeSet::new())
754            .await;
755
756        Ok(ExecuteResponse::CreatedCluster)
757    }
758
759    fn create_managed_cluster_replica_op(
760        &self,
761        cluster_id: ClusterId,
762        replica_id: ReplicaId,
763        name: String,
764        compute: &mz_sql::plan::ComputeReplicaConfig,
765        size: &String,
766        ops: &mut Vec<Op>,
767        azs: Option<&[String]>,
768        pending: bool,
769        owner_id: RoleId,
770        reason: ReplicaCreateDropReason,
771    ) -> Result<(), AdapterError> {
772        let location = mz_catalog::durable::ReplicaLocation::Managed {
773            // Concretized below from the cluster config; this intermediate value
774            // is discarded, so the list is left empty here.
775            availability_zones: Vec::new(),
776            billed_as: None,
777            internal: false,
778            size: size.clone(),
779            pending,
780        };
781
782        let logging = if let Some(config) = compute.introspection {
783            ReplicaLogging {
784                log_logging: config.debugging,
785                interval: Some(config.interval),
786            }
787        } else {
788            ReplicaLogging::default()
789        };
790
791        let config = ReplicaConfig {
792            location: self.catalog().concretize_replica_location(
793                location,
794                &self
795                    .catalog()
796                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
797                azs,
798                false,
799            )?,
800            compute: ComputeReplicaConfig { logging },
801        };
802
803        // The caller pre-allocates `replica_id` out-of-band via the durable
804        // allocator, mirroring how cluster and item ids are allocated, so
805        // nothing allocates a replica id in-apply.
806        ops.push(catalog::Op::CreateClusterReplica {
807            cluster_id,
808            replica_id,
809            name,
810            config,
811            owner_id,
812            reason,
813        });
814        Ok(())
815    }
816
817    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
818        &self,
819        azs: I,
820    ) -> Result<(), AdapterError> {
821        let cat_azs = self.catalog().state().availability_zones();
822        for az in azs.into_iter() {
823            if !cat_azs.contains(az) {
824                return Err(AdapterError::InvalidClusterReplicaAz {
825                    az: az.to_string(),
826                    expected: cat_azs.to_vec(),
827                });
828            }
829        }
830        Ok(())
831    }
832
833    #[mz_ore::instrument(level = "debug")]
834    async fn sequence_create_unmanaged_cluster(
835        &mut self,
836        session: &Session,
837        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
838        id: ClusterId,
839        mut ops: Vec<catalog::Op>,
840    ) -> Result<ExecuteResponse, AdapterError> {
841        tracing::debug!("sequence_create_unmanaged_cluster");
842
843        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
844            if let mz_sql::plan::ReplicaConfig::Orchestrated {
845                availability_zone: Some(az),
846                ..
847            } = &r
848            {
849                Some(az)
850            } else {
851                None
852            }
853        }))?;
854
855        // Eagerly validate the `max_replicas_per_cluster` limit.
856        // `catalog_transact` will do this validation too, but allocating
857        // replica IDs is expensive enough that we need to do this validation
858        // before allocating replica IDs. See database-issues#6046.
859        if id.is_user() {
860            self.validate_resource_limit(
861                0,
862                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
863                SystemVars::max_replicas_per_cluster,
864                "cluster replica",
865                MAX_REPLICAS_PER_CLUSTER.name(),
866            )?;
867        }
868
869        // Pre-allocate replica ids out-of-band via the durable allocator,
870        // picking the id type from the owning cluster. This mirrors how cluster
871        // and item ids are allocated, so nothing allocates a replica id
872        // in-apply.
873        let id_ts = self.get_catalog_write_ts().await;
874        let replica_ids = self
875            .catalog()
876            .allocate_replica_ids(id, u64::cast_from(replicas.len()), id_ts)
877            .await?;
878
879        for (replica_id, (replica_name, replica_config)) in replica_ids.into_iter().zip_eq(replicas)
880        {
881            // If the AZ was not specified, choose one, round-robin, from the ones with
882            // the lowest number of configured replicas for this cluster.
883            let (compute, location) = match replica_config {
884                mz_sql::plan::ReplicaConfig::Unorchestrated {
885                    storagectl_addrs,
886                    computectl_addrs,
887                    compute,
888                } => {
889                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
890                        storagectl_addrs,
891                        computectl_addrs,
892                    };
893                    (compute, location)
894                }
895                mz_sql::plan::ReplicaConfig::Orchestrated {
896                    availability_zone,
897                    billed_as,
898                    compute,
899                    internal,
900                    size,
901                } => {
902                    // Only internal users have access to INTERNAL and BILLED AS
903                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
904                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
905                    }
906                    // BILLED AS implies the INTERNAL flag.
907                    if billed_as.is_some() && !internal {
908                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
909                    }
910
911                    let location = mz_catalog::durable::ReplicaLocation::Managed {
912                        // The user-pinned `AVAILABILITY ZONE`, if any, as a zero-
913                        // or one-element list.
914                        availability_zones: availability_zone.into_iter().collect(),
915                        billed_as,
916                        internal,
917                        size: size.clone(),
918                        pending: false,
919                    };
920                    (compute, location)
921                }
922            };
923
924            let logging = if let Some(config) = compute.introspection {
925                ReplicaLogging {
926                    log_logging: config.debugging,
927                    interval: Some(config.interval),
928                }
929            } else {
930                ReplicaLogging::default()
931            };
932
933            let role_id = session.role_metadata().current_role;
934            let config = ReplicaConfig {
935                location: self.catalog().concretize_replica_location(
936                    location,
937                    &self
938                        .catalog()
939                        .get_role_allowed_cluster_sizes(&Some(role_id)),
940                    None,
941                    false,
942                )?,
943                compute: ComputeReplicaConfig { logging },
944            };
945
946            ops.push(catalog::Op::CreateClusterReplica {
947                cluster_id: id,
948                replica_id,
949                name: replica_name.clone(),
950                config,
951                owner_id: *session.current_role_id(),
952                reason: ReplicaCreateDropReason::Manual,
953            });
954        }
955
956        self.catalog_transact(Some(session), ops).await?;
957
958        // Resolve the new cluster's cluster-coherent scoped overrides now (see
959        // the managed path for rationale).
960        self.resolve_scoped_for_new_objects(&BTreeSet::from([id]), &BTreeSet::new())
961            .await;
962
963        Ok(ExecuteResponse::CreatedCluster)
964    }
965
966    #[mz_ore::instrument(level = "debug")]
967    pub(crate) async fn sequence_create_cluster_replica(
968        &mut self,
969        session: &Session,
970        CreateClusterReplicaPlan {
971            name,
972            cluster_id,
973            config,
974        }: CreateClusterReplicaPlan,
975    ) -> Result<ExecuteResponse, AdapterError> {
976        // Choose default AZ if necessary
977        let (compute, location) = match config {
978            mz_sql::plan::ReplicaConfig::Unorchestrated {
979                storagectl_addrs,
980                computectl_addrs,
981                compute,
982            } => {
983                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
984                    storagectl_addrs,
985                    computectl_addrs,
986                };
987                (compute, location)
988            }
989            mz_sql::plan::ReplicaConfig::Orchestrated {
990                availability_zone,
991                billed_as,
992                compute,
993                internal,
994                size,
995            } => {
996                let availability_zone = match availability_zone {
997                    Some(az) => {
998                        self.ensure_valid_azs([&az])?;
999                        Some(az)
1000                    }
1001                    None => None,
1002                };
1003                let location = mz_catalog::durable::ReplicaLocation::Managed {
1004                    // The user-pinned `AVAILABILITY ZONE`, if any, as a zero- or
1005                    // one-element list.
1006                    availability_zones: availability_zone.into_iter().collect(),
1007                    billed_as,
1008                    internal,
1009                    size,
1010                    pending: false,
1011                };
1012                (compute, location)
1013            }
1014        };
1015
1016        let logging = if let Some(config) = compute.introspection {
1017            ReplicaLogging {
1018                log_logging: config.debugging,
1019                interval: Some(config.interval),
1020            }
1021        } else {
1022            ReplicaLogging::default()
1023        };
1024
1025        let role_id = session.role_metadata().current_role;
1026        let config = ReplicaConfig {
1027            location: self.catalog().concretize_replica_location(
1028                location,
1029                &self
1030                    .catalog()
1031                    .get_role_allowed_cluster_sizes(&Some(role_id)),
1032                // Planning ensures all replicas in this codepath
1033                // are unmanaged.
1034                None,
1035                false,
1036            )?,
1037            compute: ComputeReplicaConfig { logging },
1038        };
1039
1040        let cluster = self.catalog().get_cluster(cluster_id);
1041
1042        if let ReplicaLocation::Managed(ManagedReplicaLocation {
1043            internal,
1044            billed_as,
1045            ..
1046        }) = &config.location
1047        {
1048            // Only internal users have access to INTERNAL and BILLED AS
1049            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1050                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1051            }
1052            // Managed clusters require the INTERNAL flag.
1053            if cluster.is_managed() && !*internal {
1054                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1055            }
1056            // BILLED AS implies the INTERNAL flag.
1057            if billed_as.is_some() && !*internal {
1058                coord_bail!("must specify INTERNAL when specifying BILLED AS");
1059            }
1060        }
1061
1062        // Replicas have the same owner as their cluster.
1063        let owner_id = cluster.owner_id();
1064
1065        // Pre-allocate the replica id out-of-band via the durable allocator,
1066        // picking the id type from the target cluster. The target cluster may
1067        // be a system cluster, so dispatch on its id type. This mirrors how
1068        // cluster and item ids are allocated, so nothing allocates a replica id
1069        // in-apply.
1070        let id_ts = self.get_catalog_write_ts().await;
1071        let replica_id = self
1072            .catalog()
1073            .allocate_replica_ids(cluster_id, 1, id_ts)
1074            .await?
1075            .into_element();
1076
1077        let op = catalog::Op::CreateClusterReplica {
1078            cluster_id,
1079            replica_id,
1080            name: name.clone(),
1081            config,
1082            owner_id,
1083            reason: ReplicaCreateDropReason::Manual,
1084        };
1085
1086        self.catalog_transact(Some(session), vec![op]).await?;
1087
1088        Ok(ExecuteResponse::CreatedClusterReplica)
1089    }
1090
1091    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1092    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1093    /// `details` field of the audit log event that records creating or dropping replicas.
1094    ///
1095    /// # Panics
1096    ///
1097    /// Panics if the identified cluster is not a managed cluster.
1098    /// Panics if `new_config` is not a configuration for a managed cluster.
1099    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1100        &mut self,
1101        session: Option<&Session>,
1102        cluster_id: ClusterId,
1103        new_config: ClusterConfig,
1104        reason: ReplicaCreateDropReason,
1105        strategy: AlterClusterPlanStrategy,
1106    ) -> Result<NeedsFinalization, AdapterError> {
1107        let cluster = self.catalog.get_cluster(cluster_id);
1108        let name = cluster.name().to_string();
1109        let owner_id = cluster.owner_id();
1110
1111        let mut ops = vec![];
1112        let mut finalization_needed = NeedsFinalization::No;
1113
1114        let ClusterVariant::Managed(ClusterVariantManaged {
1115            size,
1116            availability_zones,
1117            logging,
1118            replication_factor,
1119            optimizer_feature_overrides: _,
1120            schedule: _,
1121            auto_scaling_strategy: _,
1122            reconfiguration: _,
1123            burst: _,
1124        }) = &cluster.config.variant
1125        else {
1126            panic!("expected existing managed cluster config");
1127        };
1128        // Clone the existing managed config out of the cluster so the immutable
1129        // catalog borrow can be released before the out-of-band replica id
1130        // allocation below, which needs mutable access to self.
1131        let size = size.clone();
1132        let availability_zones = availability_zones.clone();
1133        let logging = logging.clone();
1134        let replication_factor = *replication_factor;
1135        let ClusterVariant::Managed(ClusterVariantManaged {
1136            size: new_size,
1137            replication_factor: new_replication_factor,
1138            availability_zones: new_availability_zones,
1139            logging: new_logging,
1140            optimizer_feature_overrides: _,
1141            schedule: _,
1142            auto_scaling_strategy: _,
1143            reconfiguration: _,
1144            burst: _,
1145        }) = &new_config.variant
1146        else {
1147            panic!("expected new managed cluster config");
1148        };
1149
1150        let role_id = session.map(|s| s.role_metadata().current_role);
1151        self.catalog.ensure_valid_replica_size(
1152            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1153            new_size,
1154            false,
1155        )?;
1156
1157        // check for active updates
1158        if cluster.replicas().any(|r| r.config.location.pending()) {
1159            return Err(AlterClusterWhilePendingReplicas);
1160        }
1161
1162        // Resolve existing replica ids by name before releasing the catalog
1163        // borrow, so the drop branches below can build their ops without it.
1164        let replica_id_by_name: BTreeMap<String, ReplicaId> = cluster
1165            .replicas()
1166            .map(|r| (r.name.clone(), r.replica_id))
1167            .collect();
1168
1169        let compute = mz_sql::plan::ComputeReplicaConfig {
1170            introspection: new_logging
1171                .interval
1172                .map(|interval| ComputeReplicaIntrospectionConfig {
1173                    debugging: new_logging.log_logging,
1174                    interval,
1175                }),
1176        };
1177
1178        // Eagerly validate the `max_replicas_per_cluster` limit.
1179        // `catalog_transact` will do this validation too, but allocating
1180        // replica IDs is expensive enough that we need to do this validation
1181        // before allocating replica IDs. See database-issues#6046.
1182        if *new_replication_factor > replication_factor {
1183            if cluster_id.is_user() {
1184                self.validate_resource_limit(
1185                    usize::cast_from(replication_factor),
1186                    i64::from(*new_replication_factor) - i64::from(replication_factor),
1187                    SystemVars::max_replicas_per_cluster,
1188                    "cluster replica",
1189                    MAX_REPLICAS_PER_CLUSTER.name(),
1190                )?;
1191            }
1192        }
1193
1194        // Count exactly as many replica ids as the branches below consume. The
1195        // config-changed branches recreate all replicas; a pure scale-up
1196        // creates only the delta; scale-down and no-op create none.
1197        let config_changed = new_size != &size
1198            || new_availability_zones != &availability_zones
1199            || new_logging != &logging;
1200        let needed_replica_ids = if config_changed {
1201            *new_replication_factor
1202        } else if *new_replication_factor > replication_factor {
1203            *new_replication_factor - replication_factor
1204        } else {
1205            0
1206        };
1207        // Allocate the replica ids out-of-band via the durable allocator, only
1208        // after the eager limit validation above so a rejected alter allocates
1209        // nothing. Pick the id type from the target cluster, which may be a
1210        // system cluster. This mirrors how cluster and item ids are allocated,
1211        // so nothing allocates a replica id in-apply. Fetch the catalog write
1212        // timestamp lazily here, since it needs mutable access to self (the
1213        // cluster borrow above is already released) and scale-down, no-op, and
1214        // automated scheduling turn-off alters must not pay an oracle
1215        // round-trip just to allocate nothing.
1216        let mut new_replica_ids = if needed_replica_ids > 0 {
1217            let id_ts = self.get_catalog_write_ts().await;
1218            self.catalog()
1219                .allocate_replica_ids(cluster_id, u64::from(needed_replica_ids), id_ts)
1220                .await?
1221                .into_iter()
1222        } else {
1223            Vec::<ReplicaId>::new().into_iter()
1224        };
1225
1226        if config_changed {
1227            self.ensure_valid_azs(new_availability_zones.iter())?;
1228            // If we're not doing a zero-downtime reconfig tear down all
1229            // replicas, create new ones else create the pending replicas and
1230            // return early asking for finalization
1231            match strategy {
1232                AlterClusterPlanStrategy::None => {
1233                    let replica_ids_and_reasons = (0..replication_factor)
1234                        .map(managed_cluster_replica_name)
1235                        .filter_map(|name| replica_id_by_name.get(&name).copied())
1236                        .map(|replica_id| {
1237                            catalog::DropObjectInfo::ClusterReplica((
1238                                cluster_id,
1239                                replica_id,
1240                                reason.clone(),
1241                            ))
1242                        })
1243                        .collect();
1244                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1245                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1246                        let replica_id = new_replica_ids
1247                            .next()
1248                            .expect("pre-allocated enough replica ids");
1249                        self.create_managed_cluster_replica_op(
1250                            cluster_id,
1251                            replica_id,
1252                            name.clone(),
1253                            &compute,
1254                            new_size,
1255                            &mut ops,
1256                            Some(new_availability_zones.as_ref()),
1257                            false,
1258                            owner_id,
1259                            reason.clone(),
1260                        )?;
1261                    }
1262                }
1263                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1264                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1265                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1266                        let replica_id = new_replica_ids
1267                            .next()
1268                            .expect("pre-allocated enough replica ids");
1269                        self.create_managed_cluster_replica_op(
1270                            cluster_id,
1271                            replica_id,
1272                            name.clone(),
1273                            &compute,
1274                            new_size,
1275                            &mut ops,
1276                            Some(new_availability_zones.as_ref()),
1277                            true,
1278                            owner_id,
1279                            reason.clone(),
1280                        )?;
1281                    }
1282                    finalization_needed = NeedsFinalization::Yes;
1283                }
1284            }
1285        } else if *new_replication_factor < replication_factor {
1286            // Adjust replica count down
1287            let replica_ids = (*new_replication_factor..replication_factor)
1288                .map(managed_cluster_replica_name)
1289                .filter_map(|name| replica_id_by_name.get(&name).copied())
1290                .map(|replica_id| {
1291                    catalog::DropObjectInfo::ClusterReplica((
1292                        cluster_id,
1293                        replica_id,
1294                        reason.clone(),
1295                    ))
1296                })
1297                .collect();
1298            ops.push(catalog::Op::DropObjects(replica_ids));
1299        } else if *new_replication_factor > replication_factor {
1300            // Adjust replica count up
1301            for name in
1302                (replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1303            {
1304                let replica_id = new_replica_ids
1305                    .next()
1306                    .expect("pre-allocated enough replica ids");
1307                self.create_managed_cluster_replica_op(
1308                    cluster_id,
1309                    replica_id,
1310                    name.clone(),
1311                    &compute,
1312                    new_size,
1313                    &mut ops,
1314                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1315                    // rescheduled.
1316                    Some(new_availability_zones.as_ref()),
1317                    false,
1318                    owner_id,
1319                    reason.clone(),
1320                )?;
1321            }
1322        }
1323
1324        // If finalization is needed, finalization should update the cluster
1325        // config.
1326        match finalization_needed {
1327            NeedsFinalization::No => {
1328                ops.push(catalog::Op::UpdateClusterConfig {
1329                    id: cluster_id,
1330                    name: name.clone(),
1331                    config: new_config,
1332                });
1333            }
1334            NeedsFinalization::Yes => {}
1335        }
1336        self.catalog_transact(session, ops).await?;
1337        Ok(finalization_needed)
1338    }
1339
1340    /// # Panics
1341    ///
1342    /// Panics if `new_config` is not a configuration for a managed cluster.
1343    async fn sequence_alter_cluster_unmanaged_to_managed(
1344        &mut self,
1345        session: &Session,
1346        cluster_id: ClusterId,
1347        mut new_config: ClusterConfig,
1348        options: PlanClusterOption,
1349    ) -> Result<(), AdapterError> {
1350        let cluster = self.catalog.get_cluster(cluster_id);
1351        let cluster_name = cluster.name().to_string();
1352
1353        let ClusterVariant::Managed(ClusterVariantManaged {
1354            size: new_size,
1355            replication_factor: new_replication_factor,
1356            availability_zones: new_availability_zones,
1357            logging: _,
1358            optimizer_feature_overrides: _,
1359            schedule: _,
1360            auto_scaling_strategy: _,
1361            reconfiguration: _,
1362            burst: _,
1363        }) = &mut new_config.variant
1364        else {
1365            panic!("expected new managed cluster config");
1366        };
1367
1368        // Validate replication factor parameter
1369        let user_replica_count = cluster
1370            .user_replicas()
1371            .count()
1372            .try_into()
1373            .expect("must_fit");
1374        match options.replication_factor {
1375            AlterOptionParameter::Set(_) => {
1376                // Validate that the replication factor matches the current length only if specified.
1377                if user_replica_count != *new_replication_factor {
1378                    coord_bail!(
1379                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1380                    );
1381                }
1382            }
1383            _ => {
1384                *new_replication_factor = user_replica_count;
1385            }
1386        }
1387
1388        let mut names = BTreeSet::new();
1389        let mut sizes = BTreeSet::new();
1390
1391        self.ensure_valid_azs(new_availability_zones.iter())?;
1392
1393        // Validate per-replica configuration
1394        for replica in cluster.user_replicas() {
1395            names.insert(replica.name.clone());
1396            match &replica.config.location {
1397                ReplicaLocation::Unmanaged(_) => coord_bail!(
1398                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1399                ),
1400                ReplicaLocation::Managed(location) => {
1401                    sizes.insert(location.size.clone());
1402
1403                    // An unmanaged cluster's replica carries its single
1404                    // user-pinned AZ (if any) as the sole entry; every pin must
1405                    // fall within the managed cluster's `AVAILABILITY ZONES`.
1406                    for az in &location.availability_zones {
1407                        if !new_availability_zones.contains(az) {
1408                            coord_bail!(
1409                                "unmanaged replica has availability zone {az} which is not \
1410                                in managed {new_availability_zones:?}"
1411                            )
1412                        }
1413                    }
1414                }
1415            }
1416        }
1417
1418        if sizes.is_empty() {
1419            assert!(
1420                cluster.user_replicas().next().is_none(),
1421                "Cluster should not have replicas"
1422            );
1423            // We didn't collect any size, so the user has to name it.
1424            match &options.size {
1425                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1426                    coord_bail!("Missing SIZE for empty cluster")
1427                }
1428                AlterOptionParameter::Set(_) => {} // Was set within the calling function.
1429            }
1430        } else if sizes.len() == 1 {
1431            let size = sizes.into_iter().next().expect("must exist");
1432            match &options.size {
1433                AlterOptionParameter::Set(sz) if *sz != size => {
1434                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1435                }
1436                _ => *new_size = size,
1437            }
1438        } else {
1439            let formatted = sizes
1440                .iter()
1441                .map(String::as_str)
1442                .collect::<Vec<_>>()
1443                .join(", ");
1444            coord_bail!(
1445                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1446            );
1447        }
1448
1449        for i in 0..*new_replication_factor {
1450            let name = managed_cluster_replica_name(i);
1451            names.remove(&name);
1452        }
1453        if !names.is_empty() {
1454            let formatted = names
1455                .iter()
1456                .map(String::as_str)
1457                .collect::<Vec<_>>()
1458                .join(", ");
1459            coord_bail!(
1460                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1461            );
1462        }
1463
1464        let ops = vec![catalog::Op::UpdateClusterConfig {
1465            id: cluster_id,
1466            name: cluster_name,
1467            config: new_config,
1468        }];
1469
1470        self.catalog_transact(Some(session), ops).await?;
1471        Ok(())
1472    }
1473
1474    async fn sequence_alter_cluster_managed_to_unmanaged(
1475        &mut self,
1476        session: &Session,
1477        cluster_id: ClusterId,
1478        new_config: ClusterConfig,
1479    ) -> Result<(), AdapterError> {
1480        let cluster = self.catalog().get_cluster(cluster_id);
1481
1482        let ops = vec![catalog::Op::UpdateClusterConfig {
1483            id: cluster_id,
1484            name: cluster.name().to_string(),
1485            config: new_config,
1486        }];
1487
1488        self.catalog_transact(Some(session), ops).await?;
1489        Ok(())
1490    }
1491
1492    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1493        &mut self,
1494        session: &Session,
1495        cluster_id: ClusterId,
1496        new_config: ClusterConfig,
1497        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1498    ) -> Result<(), AdapterError> {
1499        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1500            coord_bail!("Cannot alter replicas in unmanaged cluster");
1501        }
1502
1503        let cluster = self.catalog().get_cluster(cluster_id);
1504
1505        let ops = vec![catalog::Op::UpdateClusterConfig {
1506            id: cluster_id,
1507            name: cluster.name().to_string(),
1508            config: new_config,
1509        }];
1510
1511        self.catalog_transact(Some(session), ops).await?;
1512        Ok(())
1513    }
1514
1515    pub(crate) async fn sequence_alter_cluster_rename(
1516        &mut self,
1517        ctx: &mut ExecuteContext,
1518        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1519    ) -> Result<ExecuteResponse, AdapterError> {
1520        let op = Op::RenameCluster {
1521            id,
1522            name,
1523            to_name,
1524            check_reserved_names: true,
1525        };
1526        match self
1527            .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1528            .await
1529        {
1530            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1531            Err(err) => Err(err),
1532        }
1533    }
1534
1535    pub(crate) async fn sequence_alter_cluster_swap(
1536        &mut self,
1537        ctx: &mut ExecuteContext,
1538        AlterClusterSwapPlan {
1539            id_a,
1540            id_b,
1541            name_a,
1542            name_b,
1543            name_temp,
1544        }: AlterClusterSwapPlan,
1545    ) -> Result<ExecuteResponse, AdapterError> {
1546        let op_a = Op::RenameCluster {
1547            id: id_a,
1548            name: name_a.clone(),
1549            to_name: name_temp.clone(),
1550            check_reserved_names: false,
1551        };
1552        let op_b = Op::RenameCluster {
1553            id: id_b,
1554            name: name_b.clone(),
1555            to_name: name_a,
1556            check_reserved_names: false,
1557        };
1558        let op_temp = Op::RenameCluster {
1559            id: id_a,
1560            name: name_temp,
1561            to_name: name_b,
1562            check_reserved_names: false,
1563        };
1564
1565        match self
1566            .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1567                Box::pin(async {})
1568            })
1569            .await
1570        {
1571            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1572            Err(err) => Err(err),
1573        }
1574    }
1575
1576    pub(crate) async fn sequence_alter_cluster_replica_rename(
1577        &mut self,
1578        session: &Session,
1579        AlterClusterReplicaRenamePlan {
1580            cluster_id,
1581            replica_id,
1582            name,
1583            to_name,
1584        }: AlterClusterReplicaRenamePlan,
1585    ) -> Result<ExecuteResponse, AdapterError> {
1586        let op = catalog::Op::RenameClusterReplica {
1587            cluster_id,
1588            replica_id,
1589            name,
1590            to_name,
1591        };
1592        match self.catalog_transact(Some(session), vec![op]).await {
1593            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1594            Err(err) => Err(err),
1595        }
1596    }
1597
1598    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1599    pub(crate) async fn sequence_alter_set_cluster(
1600        &self,
1601        _session: &Session,
1602        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1603    ) -> Result<ExecuteResponse, AdapterError> {
1604        // TODO: This function needs to be implemented.
1605
1606        // Satisfy Clippy that this is an async func.
1607        async {}.await;
1608        let entry = self.catalog().get_entry(&id);
1609        match entry.item().typ() {
1610            _ => {
1611                // Unexpected; planner permitted unsupported plan.
1612                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1613            }
1614        }
1615    }
1616}
1617
1618fn managed_cluster_replica_name(index: u32) -> String {
1619    format!("r{}", index + 1)
1620}
1621
1622/// The type of finalization needed after an
1623/// operation such as alter_cluster_managed_to_managed.
1624#[derive(PartialEq)]
1625pub(crate) enum NeedsFinalization {
1626    /// Wait for the provided duration before finalizing
1627    Yes,
1628    No,
1629}