mz_adapter/coord/sequencer/inner/
cluster.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18    ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23    ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32    self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33    AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34    ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35    CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45    AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46    Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53    type Ctx = ExecuteContext;
54
55    fn validity(&mut self) -> &mut PlanValidity {
56        match self {
57            Self::Alter(stage) => &mut stage.validity,
58            Self::WaitForHydrated(stage) => &mut stage.validity,
59            Self::Finalize(stage) => &mut stage.validity,
60        }
61    }
62
63    async fn stage(
64        self,
65        coord: &mut Coordinator,
66        ctx: &mut ExecuteContext,
67    ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68        match self {
69            Self::Alter(stage) => {
70                coord
71                    .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72                    .await
73            }
74            Self::WaitForHydrated(stage) => {
75                let AlterClusterWaitForHydrated {
76                    validity,
77                    plan,
78                    new_config,
79                    timeout_time,
80                    on_timeout,
81                } = stage;
82                coord
83                    .check_if_pending_replicas_hydrated_stage(
84                        ctx.session(),
85                        plan,
86                        new_config,
87                        timeout_time,
88                        on_timeout,
89                        validity,
90                    )
91                    .await
92            }
93            Self::Finalize(stage) => {
94                coord
95                    .finalize_alter_cluster_stage(
96                        ctx.session(),
97                        stage.plan.clone(),
98                        stage.new_config.clone(),
99                    )
100                    .await
101            }
102        }
103    }
104
105    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106        Message::ClusterStageReady {
107            ctx,
108            span,
109            stage: self,
110        }
111    }
112
113    fn cancel_enabled(&self) -> bool {
114        true
115    }
116}
117
118impl Coordinator {
119    #[instrument]
120    pub(crate) async fn sequence_alter_cluster_staged(
121        &mut self,
122        ctx: ExecuteContext,
123        plan: plan::AlterClusterPlan,
124    ) {
125        let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126        self.sequence_staged(ctx, Span::current(), stage).await;
127    }
128
129    #[instrument]
130    async fn alter_cluster_validate(
131        &mut self,
132        session: &Session,
133        plan: plan::AlterClusterPlan,
134    ) -> Result<ClusterStage, AdapterError> {
135        let validity = PlanValidity::new(
136            self.catalog().transient_revision(),
137            BTreeSet::new(),
138            Some(plan.id.clone()),
139            None,
140            session.role_metadata().clone(),
141        );
142        Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143    }
144
145    async fn sequence_alter_cluster_stage(
146        &mut self,
147        session: &Session,
148        plan: plan::AlterClusterPlan,
149        validity: PlanValidity,
150    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151        let AlterClusterPlan {
152            id: cluster_id,
153            name: _,
154            ref options,
155            ref strategy,
156        } = plan;
157
158        use mz_catalog::memory::objects::ClusterVariant::*;
159        use mz_sql::plan::AlterOptionParameter::*;
160        let cluster = self.catalog.get_cluster(cluster_id);
161        let config = cluster.config.clone();
162        let mut new_config = config.clone();
163
164        match (&new_config.variant, &options.managed) {
165            (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166            (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167            (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168            (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169                // Generate a minimal correct configuration
170
171                // Size and disk adjusted later when sequencing the actual configuration change.
172                let size = "".to_string();
173                let disk = false;
174                let logging = ReplicaLogging {
175                    log_logging: false,
176                    interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
177                };
178                new_config.variant = Managed(ClusterVariantManaged {
179                    size,
180                    availability_zones: Default::default(),
181                    logging,
182                    replication_factor: 1,
183                    disk,
184                    optimizer_feature_overrides: Default::default(),
185                    schedule: Default::default(),
186                });
187            }
188        }
189
190        match &mut new_config.variant {
191            Managed(ClusterVariantManaged {
192                size,
193                availability_zones,
194                logging,
195                replication_factor,
196                disk,
197                optimizer_feature_overrides: _,
198                schedule,
199            }) => {
200                match &options.size {
201                    Set(s) => size.clone_from(s),
202                    Reset => coord_bail!("SIZE has no default value"),
203                    Unchanged => {}
204                }
205                match &options.disk {
206                    Set(d) => *disk = *d,
207                    Reset => *disk = self.catalog.system_config().disk_cluster_replicas_default(),
208                    Unchanged => {}
209                }
210                match &options.availability_zones {
211                    Set(az) => availability_zones.clone_from(az),
212                    Reset => *availability_zones = Default::default(),
213                    Unchanged => {}
214                }
215                match &options.introspection_debugging {
216                    Set(id) => logging.log_logging = *id,
217                    Reset => logging.log_logging = false,
218                    Unchanged => {}
219                }
220                match &options.introspection_interval {
221                    Set(ii) => logging.interval = ii.0,
222                    Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
223                    Unchanged => {}
224                }
225                match &options.replication_factor {
226                    Set(rf) => *replication_factor = *rf,
227                    Reset => {
228                        *replication_factor = self
229                            .catalog
230                            .system_config()
231                            .default_cluster_replication_factor()
232                    }
233                    Unchanged => {}
234                }
235                match &options.schedule {
236                    Set(new_schedule) => {
237                        *schedule = new_schedule.clone();
238                    }
239                    Reset => *schedule = Default::default(),
240                    Unchanged => {}
241                }
242                if !matches!(options.replicas, Unchanged) {
243                    coord_bail!("Cannot change REPLICAS of managed clusters");
244                }
245            }
246            Unmanaged => {
247                if !matches!(options.size, Unchanged) {
248                    coord_bail!("Cannot change SIZE of unmanaged clusters");
249                }
250                if !matches!(options.availability_zones, Unchanged) {
251                    coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
252                }
253                if !matches!(options.introspection_debugging, Unchanged) {
254                    coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
255                }
256                if !matches!(options.introspection_interval, Unchanged) {
257                    coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
258                }
259                if !matches!(options.replication_factor, Unchanged) {
260                    coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
261                }
262            }
263        }
264
265        match &options.workload_class {
266            Set(wc) => new_config.workload_class.clone_from(wc),
267            Reset => new_config.workload_class = None,
268            Unchanged => {}
269        }
270
271        if new_config == config {
272            return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
273                ObjectType::Cluster,
274            )));
275        }
276
277        let new_workload_class = new_config.workload_class.clone();
278        match (&config.variant, &new_config.variant) {
279            (Managed(_), Managed(new_config_managed)) => {
280                let alter_followup = self
281                    .sequence_alter_cluster_managed_to_managed(
282                        Some(session),
283                        cluster_id,
284                        new_config.clone(),
285                        ReplicaCreateDropReason::Manual,
286                        strategy.clone(),
287                    )
288                    .await?;
289                if alter_followup == NeedsFinalization::Yes {
290                    // For non backgrounded zero-downtime alters, store the
291                    // cluster_id in the ConnMeta to allow for cancellation.
292                    self.active_conns
293                        .get_mut(session.conn_id())
294                        .expect("There must be an active connection")
295                        .pending_cluster_alters
296                        .insert(cluster_id.clone());
297                    let new_config_managed = new_config_managed.clone();
298                    return match &strategy {
299                        AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
300                            "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
301                                .into(),
302                        )),
303                        AlterClusterPlanStrategy::For(duration) => {
304                            let span = Span::current();
305                            let plan = plan.clone();
306                            let duration = duration.clone().to_owned();
307                            Ok(StageResult::Handle(mz_ore::task::spawn(
308                                || "Finalize Alter Cluster",
309                                async move {
310                                    tokio::time::sleep(duration).await;
311                                    let stage = ClusterStage::Finalize(AlterClusterFinalize {
312                                        validity,
313                                        plan,
314                                        new_config: new_config_managed,
315                                    });
316                                    Ok(Box::new(stage))
317                                }
318                                .instrument(span),
319                            )))
320                        }
321                        AlterClusterPlanStrategy::UntilReady {
322                            timeout,
323                            on_timeout,
324                        } => Ok(StageResult::Immediate(Box::new(
325                            ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
326                                validity,
327                                plan: plan.clone(),
328                                new_config: new_config_managed.clone(),
329                                timeout_time: Instant::now() + timeout.to_owned(),
330                                on_timeout: on_timeout.to_owned(),
331                            }),
332                        ))),
333                    };
334                }
335            }
336            (Unmanaged, Managed(_)) => {
337                self.sequence_alter_cluster_unmanaged_to_managed(
338                    session,
339                    cluster_id,
340                    new_config,
341                    options.to_owned(),
342                )
343                .await?;
344            }
345            (Managed(_), Unmanaged) => {
346                self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
347                    .await?;
348            }
349            (Unmanaged, Unmanaged) => {
350                self.sequence_alter_cluster_unmanaged_to_unmanaged(
351                    session,
352                    cluster_id,
353                    new_config,
354                    options.replicas.clone(),
355                )
356                .await?;
357            }
358        }
359
360        self.controller
361            .update_cluster_workload_class(cluster_id, new_workload_class)?;
362
363        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
364            ObjectType::Cluster,
365        )))
366    }
367
368    async fn finalize_alter_cluster_stage(
369        &mut self,
370        session: &Session,
371        AlterClusterPlan {
372            id: cluster_id,
373            name: cluster_name,
374            ..
375        }: AlterClusterPlan,
376        new_config: ClusterVariantManaged,
377    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
378        let cluster = self.catalog.get_cluster(cluster_id);
379        let workload_class = cluster.config.workload_class.clone();
380        let mut ops = vec![];
381
382        // Gather the ops to remove the non pending replicas
383        // Also skip any billed_as free replicas
384        let remove_replicas = cluster
385            .replicas()
386            .filter_map(|r| {
387                if !r.config.location.pending() && !r.config.location.internal() {
388                    Some(catalog::DropObjectInfo::ClusterReplica((
389                        cluster_id.clone(),
390                        r.replica_id,
391                        ReplicaCreateDropReason::Manual,
392                    )))
393                } else {
394                    None
395                }
396            })
397            .collect();
398        ops.push(catalog::Op::DropObjects(remove_replicas));
399
400        // Gather the Ops to remove the "-pending" suffix from the name and set
401        // pending to false
402        let finalize_replicas: Vec<catalog::Op> = cluster
403            .replicas()
404            .filter_map(|r| {
405                if r.config.location.pending() {
406                    let cluster_ident = match Ident::new(cluster.name.clone()) {
407                        Ok(id) => id,
408                        Err(err) => {
409                            return Some(Err(AdapterError::internal(
410                                "Unexpected error parsing cluster name",
411                                err,
412                            )));
413                        }
414                    };
415                    let replica_ident = match Ident::new(r.name.clone()) {
416                        Ok(id) => id,
417                        Err(err) => {
418                            return Some(Err(AdapterError::internal(
419                                "Unexpected error parsing replica name",
420                                err,
421                            )));
422                        }
423                    };
424                    Some(Ok((cluster_ident, replica_ident, r)))
425                } else {
426                    None
427                }
428            })
429            // Early collection is to handle errors from generating of the
430            // Idents
431            .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
432            .into_iter()
433            .map(|(cluster_ident, replica_ident, replica)| {
434                let mut new_replica_config = replica.config.clone();
435                debug!("Promoting replica: {}", replica.name);
436                match new_replica_config.location {
437                    mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
438                        ref mut pending,
439                        ..
440                    }) => {
441                        *pending = false;
442                    }
443                    _ => {}
444                }
445
446                let mut replica_ops = vec![];
447                let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
448                if let Some(to_name) = to_name {
449                    replica_ops.push(catalog::Op::RenameClusterReplica {
450                        cluster_id: cluster_id.clone(),
451                        replica_id: replica.replica_id.to_owned(),
452                        name: QualifiedReplica {
453                            cluster: cluster_ident,
454                            replica: replica_ident,
455                        },
456                        to_name: to_name.to_owned(),
457                    });
458                }
459                replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
460                    cluster_id,
461                    replica_id: replica.replica_id.to_owned(),
462                    config: new_replica_config,
463                });
464                replica_ops
465            })
466            .flatten()
467            .collect();
468
469        ops.extend(finalize_replicas);
470
471        // Add the Op to update the cluster state
472        ops.push(Op::UpdateClusterConfig {
473            id: cluster_id,
474            name: cluster_name,
475            config: ClusterConfig {
476                variant: ClusterVariant::Managed(new_config),
477                workload_class: workload_class.clone(),
478            },
479        });
480        self.catalog_transact(Some(session), ops).await?;
481        // Remove the cluster being altered from the ConnMeta
482        // pending_cluster_alters BTreeSet
483        self.active_conns
484            .get_mut(session.conn_id())
485            .expect("There must be an active connection")
486            .pending_cluster_alters
487            .remove(&cluster_id);
488
489        self.controller
490            .update_cluster_workload_class(cluster_id, workload_class)?;
491
492        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
493            ObjectType::Cluster,
494        )))
495    }
496
497    async fn check_if_pending_replicas_hydrated_stage(
498        &mut self,
499        session: &Session,
500        plan: AlterClusterPlan,
501        new_config: ClusterVariantManaged,
502        timeout_time: Instant,
503        on_timeout: OnTimeoutAction,
504        validity: PlanValidity,
505    ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
506        // wait and re-signal wait for hydrated if not hydrated
507        let cluster = self.catalog.get_cluster(plan.id);
508        let pending_replicas = cluster
509            .replicas()
510            .filter_map(|r| {
511                if r.config.location.pending() {
512                    Some(r.replica_id.clone())
513                } else {
514                    None
515                }
516            })
517            .collect_vec();
518        // Check For timeout
519        if Instant::now() > timeout_time {
520            // Timed out handle timeout action
521            match on_timeout {
522                OnTimeoutAction::Rollback => {
523                    self.active_conns
524                        .get_mut(session.conn_id())
525                        .expect("There must be an active connection")
526                        .pending_cluster_alters
527                        .remove(&cluster.id);
528                    self.drop_reconfiguration_replicas(btreeset!(cluster.id))
529                        .await?;
530                    return Err(AdapterError::AlterClusterTimeout);
531                }
532                OnTimeoutAction::Commit => {
533                    let span = Span::current();
534                    let poll_duration = self
535                        .catalog
536                        .system_config()
537                        .cluster_alter_check_ready_interval()
538                        .clone();
539                    return Ok(StageResult::Handle(mz_ore::task::spawn(
540                        || "Finalize Alter Cluster",
541                        async move {
542                            tokio::time::sleep(poll_duration).await;
543                            let stage = ClusterStage::Finalize(AlterClusterFinalize {
544                                validity,
545                                plan,
546                                new_config,
547                            });
548                            Ok(Box::new(stage))
549                        }
550                        .instrument(span),
551                    )));
552                }
553            }
554        }
555        let compute_hydrated_fut = self
556            .controller
557            .compute
558            .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
559            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
560
561        let storage_hydrated = self
562            .controller
563            .storage
564            .collections_hydrated_on_replicas(Some(pending_replicas), &[].into())
565            .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
566
567        let span = Span::current();
568        Ok(StageResult::Handle(mz_ore::task::spawn(
569            || "Alter Cluster: wait for hydrated",
570            async move {
571                let compute_hydrated = compute_hydrated_fut
572                    .await
573                    .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
574
575                if compute_hydrated && storage_hydrated {
576                    // We're done
577                    Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
578                        validity,
579                        plan,
580                        new_config,
581                    })))
582                } else {
583                    // Check later
584                    tokio::time::sleep(Duration::from_secs(1)).await;
585                    let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
586                        validity,
587                        plan,
588                        new_config,
589                        timeout_time,
590                        on_timeout,
591                    });
592                    Ok(Box::new(stage))
593                }
594            }
595            .instrument(span),
596        )))
597    }
598
599    #[mz_ore::instrument(level = "debug")]
600    pub(crate) async fn sequence_create_cluster(
601        &mut self,
602        session: &Session,
603        CreateClusterPlan {
604            name,
605            variant,
606            workload_class,
607        }: CreateClusterPlan,
608    ) -> Result<ExecuteResponse, AdapterError> {
609        tracing::debug!("sequence_create_cluster");
610
611        let id_ts = self.get_catalog_write_ts().await;
612        let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
613        // The catalog items for the introspection sources are shared between all replicas
614        // of a compute instance, so we create them unconditionally during instance creation.
615        // Whether a replica actually maintains introspection arrangements is determined by the
616        // per-replica introspection configuration.
617        let introspection_sources = BUILTINS::logs().collect();
618        let cluster_variant = match &variant {
619            CreateClusterVariant::Managed(plan) => {
620                let logging = if let Some(config) = plan.compute.introspection {
621                    ReplicaLogging {
622                        log_logging: config.debugging,
623                        interval: Some(config.interval),
624                    }
625                } else {
626                    ReplicaLogging::default()
627                };
628                ClusterVariant::Managed(ClusterVariantManaged {
629                    size: plan.size.clone(),
630                    availability_zones: plan.availability_zones.clone(),
631                    logging,
632                    replication_factor: plan.replication_factor,
633                    disk: plan.disk,
634                    optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
635                    schedule: plan.schedule.clone(),
636                })
637            }
638            CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
639        };
640        let config = ClusterConfig {
641            variant: cluster_variant,
642            workload_class,
643        };
644        let ops = vec![catalog::Op::CreateCluster {
645            id,
646            name: name.clone(),
647            introspection_sources,
648            owner_id: *session.current_role_id(),
649            config,
650        }];
651
652        match variant {
653            CreateClusterVariant::Managed(plan) => {
654                self.sequence_create_managed_cluster(session, plan, id, ops)
655                    .await
656            }
657            CreateClusterVariant::Unmanaged(plan) => {
658                self.sequence_create_unmanaged_cluster(session, plan, id, ops)
659                    .await
660            }
661        }
662    }
663
664    #[mz_ore::instrument(level = "debug")]
665    async fn sequence_create_managed_cluster(
666        &mut self,
667        session: &Session,
668        CreateClusterManagedPlan {
669            availability_zones,
670            compute,
671            replication_factor,
672            size,
673            disk,
674            optimizer_feature_overrides: _,
675            schedule: _,
676        }: CreateClusterManagedPlan,
677        cluster_id: ClusterId,
678        mut ops: Vec<catalog::Op>,
679    ) -> Result<ExecuteResponse, AdapterError> {
680        tracing::debug!("sequence_create_managed_cluster");
681
682        self.ensure_valid_azs(availability_zones.iter())?;
683
684        let role_id = session.role_metadata().current_role;
685        self.catalog.ensure_valid_replica_size(
686            &self
687                .catalog()
688                .get_role_allowed_cluster_sizes(&Some(role_id)),
689            &size,
690        )?;
691
692        // Eagerly validate the `max_replicas_per_cluster` limit.
693        // `catalog_transact` will do this validation too, but allocating
694        // replica IDs is expensive enough that we need to do this validation
695        // before allocating replica IDs. See database-issues#6046.
696        if cluster_id.is_user() {
697            self.validate_resource_limit(
698                0,
699                i64::from(replication_factor),
700                SystemVars::max_replicas_per_cluster,
701                "cluster replica",
702                MAX_REPLICAS_PER_CLUSTER.name(),
703            )?;
704        }
705
706        for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
707            self.create_managed_cluster_replica_op(
708                cluster_id,
709                replica_name,
710                &compute,
711                &size,
712                &mut ops,
713                if availability_zones.is_empty() {
714                    None
715                } else {
716                    Some(availability_zones.as_ref())
717                },
718                disk,
719                false,
720                *session.current_role_id(),
721                ReplicaCreateDropReason::Manual,
722            )?;
723        }
724
725        self.catalog_transact(Some(session), ops).await?;
726
727        self.create_cluster(cluster_id).await;
728
729        Ok(ExecuteResponse::CreatedCluster)
730    }
731
732    fn create_managed_cluster_replica_op(
733        &self,
734        cluster_id: ClusterId,
735        name: String,
736        compute: &mz_sql::plan::ComputeReplicaConfig,
737        size: &String,
738        ops: &mut Vec<Op>,
739        azs: Option<&[String]>,
740        disk: bool,
741        pending: bool,
742        owner_id: RoleId,
743        reason: ReplicaCreateDropReason,
744    ) -> Result<(), AdapterError> {
745        let location = mz_catalog::durable::ReplicaLocation::Managed {
746            availability_zone: None,
747            billed_as: None,
748            disk,
749            internal: false,
750            size: size.clone(),
751            pending,
752        };
753
754        let logging = if let Some(config) = compute.introspection {
755            ReplicaLogging {
756                log_logging: config.debugging,
757                interval: Some(config.interval),
758            }
759        } else {
760            ReplicaLogging::default()
761        };
762
763        let config = ReplicaConfig {
764            location: self.catalog().concretize_replica_location(
765                location,
766                &self
767                    .catalog()
768                    .get_role_allowed_cluster_sizes(&Some(owner_id)),
769                azs,
770            )?,
771            compute: ComputeReplicaConfig { logging },
772        };
773
774        ops.push(catalog::Op::CreateClusterReplica {
775            cluster_id,
776            name,
777            config,
778            owner_id,
779            reason,
780        });
781        Ok(())
782    }
783
784    fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
785        &self,
786        azs: I,
787    ) -> Result<(), AdapterError> {
788        let cat_azs = self.catalog().state().availability_zones();
789        for az in azs.into_iter() {
790            if !cat_azs.contains(az) {
791                return Err(AdapterError::InvalidClusterReplicaAz {
792                    az: az.to_string(),
793                    expected: cat_azs.to_vec(),
794                });
795            }
796        }
797        Ok(())
798    }
799
800    #[mz_ore::instrument(level = "debug")]
801    async fn sequence_create_unmanaged_cluster(
802        &mut self,
803        session: &Session,
804        CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
805        id: ClusterId,
806        mut ops: Vec<catalog::Op>,
807    ) -> Result<ExecuteResponse, AdapterError> {
808        tracing::debug!("sequence_create_unmanaged_cluster");
809
810        self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
811            if let mz_sql::plan::ReplicaConfig::Orchestrated {
812                availability_zone: Some(az),
813                ..
814            } = &r
815            {
816                Some(az)
817            } else {
818                None
819            }
820        }))?;
821
822        // Eagerly validate the `max_replicas_per_cluster` limit.
823        // `catalog_transact` will do this validation too, but allocating
824        // replica IDs is expensive enough that we need to do this validation
825        // before allocating replica IDs. See database-issues#6046.
826        if id.is_user() {
827            self.validate_resource_limit(
828                0,
829                i64::try_from(replicas.len()).unwrap_or(i64::MAX),
830                SystemVars::max_replicas_per_cluster,
831                "cluster replica",
832                MAX_REPLICAS_PER_CLUSTER.name(),
833            )?;
834        }
835
836        for (replica_name, replica_config) in replicas {
837            // If the AZ was not specified, choose one, round-robin, from the ones with
838            // the lowest number of configured replicas for this cluster.
839            let (compute, location) = match replica_config {
840                mz_sql::plan::ReplicaConfig::Unorchestrated {
841                    storagectl_addrs,
842                    storage_addrs,
843                    computectl_addrs,
844                    compute_addrs,
845                    workers,
846                    compute,
847                } => {
848                    let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
849                        storagectl_addrs,
850                        storage_addrs,
851                        computectl_addrs,
852                        compute_addrs,
853                        workers,
854                    };
855                    (compute, location)
856                }
857                mz_sql::plan::ReplicaConfig::Orchestrated {
858                    availability_zone,
859                    billed_as,
860                    compute,
861                    disk,
862                    internal,
863                    size,
864                } => {
865                    // Only internal users have access to INTERNAL and BILLED AS
866                    if !session.user().is_internal() && (internal || billed_as.is_some()) {
867                        coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
868                    }
869                    // BILLED AS implies the INTERNAL flag.
870                    if billed_as.is_some() && !internal {
871                        coord_bail!("must specify INTERNAL when specifying BILLED AS");
872                    }
873
874                    let location = mz_catalog::durable::ReplicaLocation::Managed {
875                        availability_zone,
876                        billed_as,
877                        disk,
878                        internal,
879                        size: size.clone(),
880                        pending: false,
881                    };
882                    (compute, location)
883                }
884            };
885
886            let logging = if let Some(config) = compute.introspection {
887                ReplicaLogging {
888                    log_logging: config.debugging,
889                    interval: Some(config.interval),
890                }
891            } else {
892                ReplicaLogging::default()
893            };
894
895            let role_id = session.role_metadata().current_role;
896            let config = ReplicaConfig {
897                location: self.catalog().concretize_replica_location(
898                    location,
899                    &self
900                        .catalog()
901                        .get_role_allowed_cluster_sizes(&Some(role_id)),
902                    None,
903                )?,
904                compute: ComputeReplicaConfig { logging },
905            };
906
907            ops.push(catalog::Op::CreateClusterReplica {
908                cluster_id: id,
909                name: replica_name.clone(),
910                config,
911                owner_id: *session.current_role_id(),
912                reason: ReplicaCreateDropReason::Manual,
913            });
914        }
915
916        self.catalog_transact(Some(session), ops).await?;
917
918        self.create_cluster(id).await;
919
920        Ok(ExecuteResponse::CreatedCluster)
921    }
922
923    async fn create_cluster(&mut self, cluster_id: ClusterId) {
924        let Coordinator {
925            catalog,
926            controller,
927            ..
928        } = self;
929        let cluster = catalog.get_cluster(cluster_id);
930        let cluster_id = cluster.id;
931        let introspection_source_ids: Vec<_> =
932            cluster.log_indexes.iter().map(|(_, id)| *id).collect();
933
934        controller
935            .create_cluster(
936                cluster_id,
937                mz_controller::clusters::ClusterConfig {
938                    arranged_logs: cluster.log_indexes.clone(),
939                    workload_class: cluster.config.workload_class.clone(),
940                },
941            )
942            .expect("creating cluster must not fail");
943
944        let replica_ids: Vec<_> = cluster.replicas().map(|r| r.replica_id).collect();
945        for replica_id in replica_ids {
946            self.create_cluster_replica(cluster_id, replica_id).await;
947        }
948
949        if !introspection_source_ids.is_empty() {
950            self.initialize_compute_read_policies(
951                introspection_source_ids,
952                cluster_id,
953                CompactionWindow::Default,
954            )
955            .await;
956        }
957    }
958
959    #[mz_ore::instrument(level = "debug")]
960    pub(crate) async fn sequence_create_cluster_replica(
961        &mut self,
962        session: &Session,
963        CreateClusterReplicaPlan {
964            name,
965            cluster_id,
966            config,
967        }: CreateClusterReplicaPlan,
968    ) -> Result<ExecuteResponse, AdapterError> {
969        // Choose default AZ if necessary
970        let (compute, location) = match config {
971            mz_sql::plan::ReplicaConfig::Unorchestrated {
972                storagectl_addrs,
973                storage_addrs,
974                computectl_addrs,
975                compute_addrs,
976                workers,
977                compute,
978            } => {
979                let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
980                    storagectl_addrs,
981                    storage_addrs,
982                    computectl_addrs,
983                    compute_addrs,
984                    workers,
985                };
986                (compute, location)
987            }
988            mz_sql::plan::ReplicaConfig::Orchestrated {
989                availability_zone,
990                billed_as,
991                compute,
992                disk,
993                internal,
994                size,
995            } => {
996                let availability_zone = match availability_zone {
997                    Some(az) => {
998                        self.ensure_valid_azs([&az])?;
999                        Some(az)
1000                    }
1001                    None => None,
1002                };
1003                let location = mz_catalog::durable::ReplicaLocation::Managed {
1004                    availability_zone,
1005                    billed_as,
1006                    disk,
1007                    internal,
1008                    size,
1009                    pending: false,
1010                };
1011                (compute, location)
1012            }
1013        };
1014
1015        let logging = if let Some(config) = compute.introspection {
1016            ReplicaLogging {
1017                log_logging: config.debugging,
1018                interval: Some(config.interval),
1019            }
1020        } else {
1021            ReplicaLogging::default()
1022        };
1023
1024        let role_id = session.role_metadata().current_role;
1025        let config = ReplicaConfig {
1026            location: self.catalog().concretize_replica_location(
1027                location,
1028                &self
1029                    .catalog()
1030                    .get_role_allowed_cluster_sizes(&Some(role_id)),
1031                // Planning ensures all replicas in this codepath
1032                // are unmanaged.
1033                None,
1034            )?,
1035            compute: ComputeReplicaConfig { logging },
1036        };
1037
1038        let cluster = self.catalog().get_cluster(cluster_id);
1039
1040        if let ReplicaLocation::Managed(ManagedReplicaLocation {
1041            internal,
1042            billed_as,
1043            ..
1044        }) = &config.location
1045        {
1046            // Only internal users have access to INTERNAL and BILLED AS
1047            if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1048                coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1049            }
1050            // Managed clusters require the INTERNAL flag.
1051            if cluster.is_managed() && !*internal {
1052                coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1053            }
1054            // BILLED AS implies the INTERNAL flag.
1055            if billed_as.is_some() && !*internal {
1056                coord_bail!("must specify INTERNAL when specifying BILLED AS");
1057            }
1058        }
1059
1060        // Replicas have the same owner as their cluster.
1061        let owner_id = cluster.owner_id();
1062        let op = catalog::Op::CreateClusterReplica {
1063            cluster_id,
1064            name: name.clone(),
1065            config,
1066            owner_id,
1067            reason: ReplicaCreateDropReason::Manual,
1068        };
1069
1070        self.catalog_transact(Some(session), vec![op]).await?;
1071
1072        let id = self
1073            .catalog()
1074            .resolve_replica_in_cluster(&cluster_id, &name)
1075            .expect("just created")
1076            .replica_id();
1077        self.create_cluster_replica(cluster_id, id).await;
1078
1079        Ok(ExecuteResponse::CreatedClusterReplica)
1080    }
1081
1082    async fn create_cluster_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
1083        let cluster = self.catalog().get_cluster(cluster_id);
1084        let role = cluster.role();
1085        let replica_config = cluster
1086            .replica(replica_id)
1087            .expect("known to exist")
1088            .config
1089            .clone();
1090
1091        let enable_worker_core_affinity =
1092            self.catalog().system_config().enable_worker_core_affinity();
1093
1094        self.controller
1095            .create_replica(
1096                cluster_id,
1097                replica_id,
1098                role,
1099                replica_config,
1100                enable_worker_core_affinity,
1101            )
1102            .expect("creating replicas must not fail");
1103
1104        self.install_introspection_subscribes(cluster_id, replica_id)
1105            .await;
1106    }
1107
1108    /// When this is called by the automated cluster scheduling, `scheduling_decision_reason` should
1109    /// contain information on why is a cluster being turned On/Off. It will be forwarded to the
1110    /// `details` field of the audit log event that records creating or dropping replicas.
1111    ///
1112    /// # Panics
1113    ///
1114    /// Panics if the identified cluster is not a managed cluster.
1115    /// Panics if `new_config` is not a configuration for a managed cluster.
1116    pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1117        &mut self,
1118        session: Option<&Session>,
1119        cluster_id: ClusterId,
1120        new_config: ClusterConfig,
1121        reason: ReplicaCreateDropReason,
1122        strategy: AlterClusterPlanStrategy,
1123    ) -> Result<NeedsFinalization, AdapterError> {
1124        let cluster = self.catalog.get_cluster(cluster_id);
1125        let name = cluster.name().to_string();
1126        let owner_id = cluster.owner_id();
1127
1128        let mut ops = vec![];
1129        let mut create_cluster_replicas = vec![];
1130        let mut finalization_needed = NeedsFinalization::No;
1131
1132        let ClusterVariant::Managed(ClusterVariantManaged {
1133            size,
1134            availability_zones,
1135            logging,
1136            replication_factor,
1137            disk,
1138            optimizer_feature_overrides: _,
1139            schedule: _,
1140        }) = &cluster.config.variant
1141        else {
1142            panic!("expected existing managed cluster config");
1143        };
1144        let ClusterVariant::Managed(ClusterVariantManaged {
1145            size: new_size,
1146            replication_factor: new_replication_factor,
1147            availability_zones: new_availability_zones,
1148            logging: new_logging,
1149            disk: new_disk,
1150            optimizer_feature_overrides: _,
1151            schedule: _,
1152        }) = &new_config.variant
1153        else {
1154            panic!("expected new managed cluster config");
1155        };
1156
1157        let role_id = session.map(|s| s.role_metadata().current_role);
1158        self.catalog.ensure_valid_replica_size(
1159            &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1160            new_size,
1161        )?;
1162
1163        // check for active updates
1164        if cluster.replicas().any(|r| r.config.location.pending()) {
1165            coord_bail!("Cannot Alter clusters with pending updates.")
1166        }
1167
1168        let compute = mz_sql::plan::ComputeReplicaConfig {
1169            introspection: new_logging
1170                .interval
1171                .map(|interval| ComputeReplicaIntrospectionConfig {
1172                    debugging: new_logging.log_logging,
1173                    interval,
1174                }),
1175        };
1176
1177        // Eagerly validate the `max_replicas_per_cluster` limit.
1178        // `catalog_transact` will do this validation too, but allocating
1179        // replica IDs is expensive enough that we need to do this validation
1180        // before allocating replica IDs. See database-issues#6046.
1181        if new_replication_factor > replication_factor {
1182            if cluster_id.is_user() {
1183                self.validate_resource_limit(
1184                    usize::cast_from(*replication_factor),
1185                    i64::from(*new_replication_factor) - i64::from(*replication_factor),
1186                    SystemVars::max_replicas_per_cluster,
1187                    "cluster replica",
1188                    MAX_REPLICAS_PER_CLUSTER.name(),
1189                )?;
1190            }
1191        }
1192
1193        if new_size != size
1194            || new_availability_zones != availability_zones
1195            || new_logging != logging
1196            || new_disk != disk
1197        {
1198            self.ensure_valid_azs(new_availability_zones.iter())?;
1199            // If we're not doing a zero-downtime reconfig tear down all
1200            // replicas, create new ones else create the pending replicas and
1201            // return early asking for finalization
1202            match strategy {
1203                AlterClusterPlanStrategy::None => {
1204                    let replica_ids_and_reasons = (0..*replication_factor)
1205                        .map(managed_cluster_replica_name)
1206                        .filter_map(|name| cluster.replica_id(&name))
1207                        .map(|replica_id| {
1208                            catalog::DropObjectInfo::ClusterReplica((
1209                                cluster.id(),
1210                                replica_id,
1211                                reason.clone(),
1212                            ))
1213                        })
1214                        .collect();
1215                    ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1216                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1217                        self.create_managed_cluster_replica_op(
1218                            cluster_id,
1219                            name.clone(),
1220                            &compute,
1221                            new_size,
1222                            &mut ops,
1223                            Some(new_availability_zones.as_ref()),
1224                            *new_disk,
1225                            false,
1226                            owner_id,
1227                            reason.clone(),
1228                        )?;
1229                        create_cluster_replicas.push((cluster_id, name));
1230                    }
1231                }
1232                AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1233                    for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1234                        let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1235                        self.create_managed_cluster_replica_op(
1236                            cluster_id,
1237                            name.clone(),
1238                            &compute,
1239                            new_size,
1240                            &mut ops,
1241                            Some(new_availability_zones.as_ref()),
1242                            *new_disk,
1243                            true,
1244                            owner_id,
1245                            reason.clone(),
1246                        )?;
1247                        create_cluster_replicas.push((cluster_id, name));
1248                    }
1249                    finalization_needed = NeedsFinalization::Yes;
1250                }
1251            }
1252        } else if new_replication_factor < replication_factor {
1253            // Adjust replica count down
1254            let replica_ids = (*new_replication_factor..*replication_factor)
1255                .map(managed_cluster_replica_name)
1256                .filter_map(|name| cluster.replica_id(&name))
1257                .map(|replica_id| {
1258                    catalog::DropObjectInfo::ClusterReplica((
1259                        cluster.id(),
1260                        replica_id,
1261                        reason.clone(),
1262                    ))
1263                })
1264                .collect();
1265            ops.push(catalog::Op::DropObjects(replica_ids));
1266        } else if new_replication_factor > replication_factor {
1267            // Adjust replica count up
1268            for name in
1269                (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1270            {
1271                self.create_managed_cluster_replica_op(
1272                    cluster_id,
1273                    name.clone(),
1274                    &compute,
1275                    new_size,
1276                    &mut ops,
1277                    // AVAILABILITY ZONES hasn't changed, so existing replicas don't need to be
1278                    // rescheduled.
1279                    Some(new_availability_zones.as_ref()),
1280                    *new_disk,
1281                    false,
1282                    owner_id,
1283                    reason.clone(),
1284                )?;
1285                create_cluster_replicas.push((cluster_id, name))
1286            }
1287        }
1288
1289        // If finalization is needed, finalization should update the cluster
1290        // config.
1291        match finalization_needed {
1292            NeedsFinalization::No => {
1293                ops.push(catalog::Op::UpdateClusterConfig {
1294                    id: cluster_id,
1295                    name,
1296                    config: new_config,
1297                });
1298            }
1299            _ => {}
1300        }
1301        self.catalog_transact(session, ops.clone()).await?;
1302        for (cluster_id, replica_name) in create_cluster_replicas {
1303            let replica_id = self
1304                .catalog()
1305                .resolve_replica_in_cluster(&cluster_id, &replica_name)
1306                .expect("just created")
1307                .replica_id();
1308            self.create_cluster_replica(cluster_id, replica_id).await;
1309        }
1310        Ok(finalization_needed)
1311    }
1312
1313    /// # Panics
1314    ///
1315    /// Panics if `new_config` is not a configuration for a managed cluster.
1316    async fn sequence_alter_cluster_unmanaged_to_managed(
1317        &mut self,
1318        session: &Session,
1319        cluster_id: ClusterId,
1320        mut new_config: ClusterConfig,
1321        options: PlanClusterOption,
1322    ) -> Result<(), AdapterError> {
1323        let cluster = self.catalog.get_cluster(cluster_id);
1324        let cluster_name = cluster.name().to_string();
1325
1326        let ClusterVariant::Managed(ClusterVariantManaged {
1327            size: new_size,
1328            replication_factor: new_replication_factor,
1329            availability_zones: new_availability_zones,
1330            logging: _,
1331            disk: new_disk,
1332            optimizer_feature_overrides: _,
1333            schedule: _,
1334        }) = &mut new_config.variant
1335        else {
1336            panic!("expected new managed cluster config");
1337        };
1338
1339        // Validate replication factor parameter
1340        let user_replica_count = cluster
1341            .user_replicas()
1342            .count()
1343            .try_into()
1344            .expect("must_fit");
1345        match options.replication_factor {
1346            AlterOptionParameter::Set(_) => {
1347                // Validate that the replication factor matches the current length only if specified.
1348                if user_replica_count != *new_replication_factor {
1349                    coord_bail!(
1350                        "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1351                    );
1352                }
1353            }
1354            _ => {
1355                *new_replication_factor = user_replica_count;
1356            }
1357        }
1358
1359        let mut names = BTreeSet::new();
1360        let mut sizes = BTreeSet::new();
1361        let mut disks = BTreeSet::new();
1362
1363        self.ensure_valid_azs(new_availability_zones.iter())?;
1364
1365        // Validate per-replica configuration
1366        for replica in cluster.user_replicas() {
1367            names.insert(replica.name.clone());
1368            match &replica.config.location {
1369                ReplicaLocation::Unmanaged(_) => coord_bail!(
1370                    "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1371                ),
1372                ReplicaLocation::Managed(location) => {
1373                    sizes.insert(location.size.clone());
1374                    disks.insert(location.disk);
1375
1376                    if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1377                        &location.availability_zones
1378                    {
1379                        if !new_availability_zones.contains(az) {
1380                            coord_bail!(
1381                                "unmanaged replica has availability zone {az} which is not \
1382                                in managed {new_availability_zones:?}"
1383                            )
1384                        }
1385                    }
1386                }
1387            }
1388        }
1389
1390        if sizes.is_empty() {
1391            assert!(
1392                cluster.user_replicas().next().is_none(),
1393                "Cluster should not have replicas"
1394            );
1395            // We didn't collect any size, so the user has to name it.
1396            match &options.size {
1397                AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1398                    coord_bail!("Missing SIZE for empty cluster")
1399                }
1400                _ => {} // Was set within the calling function.
1401            }
1402        } else if sizes.len() == 1 {
1403            let size = sizes.into_iter().next().expect("must exist");
1404            match &options.size {
1405                AlterOptionParameter::Set(sz) if *sz != size => {
1406                    coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1407                }
1408                _ => *new_size = size,
1409            }
1410        } else {
1411            let formatted = sizes
1412                .iter()
1413                .map(String::as_str)
1414                .collect::<Vec<_>>()
1415                .join(", ");
1416            coord_bail!(
1417                "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1418            );
1419        }
1420
1421        for i in 0..*new_replication_factor {
1422            let name = managed_cluster_replica_name(i);
1423            names.remove(&name);
1424        }
1425        if !names.is_empty() {
1426            let formatted = names
1427                .iter()
1428                .map(String::as_str)
1429                .collect::<Vec<_>>()
1430                .join(", ");
1431            coord_bail!(
1432                "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1433            );
1434        }
1435
1436        if disks.len() == 1 {
1437            let disk = disks.into_iter().next().expect("must exist");
1438            match &options.disk {
1439                AlterOptionParameter::Set(ds) if *ds != disk => {
1440                    coord_bail!(
1441                        "Cluster replicas with DISK {disk} do not match expected DISK {ds}"
1442                    );
1443                }
1444                _ => *new_disk = disk,
1445            }
1446        } else if !disks.is_empty() {
1447            coord_bail!(
1448                "Cannot convert unmanaged cluster to managed, non-unique replica DISK options"
1449            );
1450        }
1451
1452        let ops = vec![catalog::Op::UpdateClusterConfig {
1453            id: cluster_id,
1454            name: cluster_name,
1455            config: new_config,
1456        }];
1457
1458        self.catalog_transact(Some(session), ops).await?;
1459        Ok(())
1460    }
1461
1462    async fn sequence_alter_cluster_managed_to_unmanaged(
1463        &mut self,
1464        session: &Session,
1465        cluster_id: ClusterId,
1466        new_config: ClusterConfig,
1467    ) -> Result<(), AdapterError> {
1468        let cluster = self.catalog().get_cluster(cluster_id);
1469
1470        let ops = vec![catalog::Op::UpdateClusterConfig {
1471            id: cluster_id,
1472            name: cluster.name().to_string(),
1473            config: new_config,
1474        }];
1475
1476        self.catalog_transact(Some(session), ops).await?;
1477        Ok(())
1478    }
1479
1480    async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1481        &mut self,
1482        session: &Session,
1483        cluster_id: ClusterId,
1484        new_config: ClusterConfig,
1485        replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1486    ) -> Result<(), AdapterError> {
1487        if !matches!(replicas, AlterOptionParameter::Unchanged) {
1488            coord_bail!("Cannot alter replicas in unmanaged cluster");
1489        }
1490
1491        let cluster = self.catalog().get_cluster(cluster_id);
1492
1493        let ops = vec![catalog::Op::UpdateClusterConfig {
1494            id: cluster_id,
1495            name: cluster.name().to_string(),
1496            config: new_config,
1497        }];
1498
1499        self.catalog_transact(Some(session), ops).await?;
1500        Ok(())
1501    }
1502
1503    pub(crate) async fn sequence_alter_cluster_rename(
1504        &mut self,
1505        session: &mut Session,
1506        AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1507    ) -> Result<ExecuteResponse, AdapterError> {
1508        let op = Op::RenameCluster {
1509            id,
1510            name,
1511            to_name,
1512            check_reserved_names: true,
1513        };
1514        match self
1515            .catalog_transact_with_ddl_transaction(session, vec![op])
1516            .await
1517        {
1518            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1519            Err(err) => Err(err),
1520        }
1521    }
1522
1523    pub(crate) async fn sequence_alter_cluster_swap(
1524        &mut self,
1525        session: &mut Session,
1526        AlterClusterSwapPlan {
1527            id_a,
1528            id_b,
1529            name_a,
1530            name_b,
1531            name_temp,
1532        }: AlterClusterSwapPlan,
1533    ) -> Result<ExecuteResponse, AdapterError> {
1534        let op_a = Op::RenameCluster {
1535            id: id_a,
1536            name: name_a.clone(),
1537            to_name: name_temp.clone(),
1538            check_reserved_names: false,
1539        };
1540        let op_b = Op::RenameCluster {
1541            id: id_b,
1542            name: name_b.clone(),
1543            to_name: name_a,
1544            check_reserved_names: false,
1545        };
1546        let op_temp = Op::RenameCluster {
1547            id: id_a,
1548            name: name_temp,
1549            to_name: name_b,
1550            check_reserved_names: false,
1551        };
1552
1553        match self
1554            .catalog_transact_with_ddl_transaction(session, vec![op_a, op_b, op_temp])
1555            .await
1556        {
1557            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1558            Err(err) => Err(err),
1559        }
1560    }
1561
1562    pub(crate) async fn sequence_alter_cluster_replica_rename(
1563        &mut self,
1564        session: &Session,
1565        AlterClusterReplicaRenamePlan {
1566            cluster_id,
1567            replica_id,
1568            name,
1569            to_name,
1570        }: AlterClusterReplicaRenamePlan,
1571    ) -> Result<ExecuteResponse, AdapterError> {
1572        let op = catalog::Op::RenameClusterReplica {
1573            cluster_id,
1574            replica_id,
1575            name,
1576            to_name,
1577        };
1578        match self.catalog_transact(Some(session), vec![op]).await {
1579            Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1580            Err(err) => Err(err),
1581        }
1582    }
1583
1584    /// Convert a [`AlterSetClusterPlan`] to a sequence of catalog operators and adjust state.
1585    pub(crate) async fn sequence_alter_set_cluster(
1586        &self,
1587        _session: &Session,
1588        AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1589    ) -> Result<ExecuteResponse, AdapterError> {
1590        // TODO: This function needs to be implemented.
1591
1592        // Satisfy Clippy that this is an async func.
1593        async {}.await;
1594        let entry = self.catalog().get_entry(&id);
1595        match entry.item().typ() {
1596            _ => {
1597                // Unexpected; planner permitted unsupported plan.
1598                Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1599            }
1600        }
1601    }
1602}
1603
1604fn managed_cluster_replica_name(index: u32) -> String {
1605    format!("r{}", index + 1)
1606}
1607
1608/// The type of finalization needed after an
1609/// operation such as alter_cluster_managed_to_managed.
1610#[derive(PartialEq)]
1611pub(crate) enum NeedsFinalization {
1612    /// Wait for the provided duration before finalizing
1613    Yes,
1614    No,
1615}