1use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
22 ReplicaLogging,
23};
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL};
25use mz_ore::cast::CastFrom;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46 Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53 type Ctx = ExecuteContext;
54
55 fn validity(&mut self) -> &mut PlanValidity {
56 match self {
57 Self::Alter(stage) => &mut stage.validity,
58 Self::WaitForHydrated(stage) => &mut stage.validity,
59 Self::Finalize(stage) => &mut stage.validity,
60 }
61 }
62
63 async fn stage(
64 self,
65 coord: &mut Coordinator,
66 ctx: &mut ExecuteContext,
67 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68 match self {
69 Self::Alter(stage) => {
70 coord
71 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72 .await
73 }
74 Self::WaitForHydrated(stage) => {
75 let AlterClusterWaitForHydrated {
76 validity,
77 plan,
78 new_config,
79 timeout_time,
80 on_timeout,
81 } = stage;
82 coord
83 .check_if_pending_replicas_hydrated_stage(
84 ctx.session(),
85 plan,
86 new_config,
87 timeout_time,
88 on_timeout,
89 validity,
90 )
91 .await
92 }
93 Self::Finalize(stage) => {
94 coord
95 .finalize_alter_cluster_stage(
96 ctx.session(),
97 stage.plan.clone(),
98 stage.new_config.clone(),
99 )
100 .await
101 }
102 }
103 }
104
105 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106 Message::ClusterStageReady {
107 ctx,
108 span,
109 stage: self,
110 }
111 }
112
113 fn cancel_enabled(&self) -> bool {
114 true
115 }
116}
117
118impl Coordinator {
119 #[instrument]
120 pub(crate) async fn sequence_alter_cluster_staged(
121 &mut self,
122 ctx: ExecuteContext,
123 plan: plan::AlterClusterPlan,
124 ) {
125 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126 self.sequence_staged(ctx, Span::current(), stage).await;
127 }
128
129 #[instrument]
130 async fn alter_cluster_validate(
131 &self,
132 session: &Session,
133 plan: plan::AlterClusterPlan,
134 ) -> Result<ClusterStage, AdapterError> {
135 let validity = PlanValidity::new(
136 self.catalog().transient_revision(),
137 BTreeSet::new(),
138 Some(plan.id.clone()),
139 None,
140 session.role_metadata().clone(),
141 );
142 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143 }
144
145 async fn sequence_alter_cluster_stage(
146 &mut self,
147 session: &Session,
148 plan: plan::AlterClusterPlan,
149 validity: PlanValidity,
150 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151 let AlterClusterPlan {
152 id: cluster_id,
153 name: _,
154 ref options,
155 ref strategy,
156 } = plan;
157
158 use mz_catalog::memory::objects::ClusterVariant::*;
159 use mz_sql::plan::AlterOptionParameter::*;
160 let cluster = self.catalog.get_cluster(cluster_id);
161 let config = cluster.config.clone();
162 let mut new_config = config.clone();
163
164 match (&new_config.variant, &options.managed) {
165 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169 let size = "".to_string();
173 let logging = ReplicaLogging {
174 log_logging: false,
175 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
176 };
177 new_config.variant = Managed(ClusterVariantManaged {
178 size,
179 availability_zones: Default::default(),
180 logging,
181 replication_factor: 1,
182 optimizer_feature_overrides: Default::default(),
183 schedule: Default::default(),
184 });
185 }
186 }
187
188 match &mut new_config.variant {
189 Managed(ClusterVariantManaged {
190 size,
191 availability_zones,
192 logging,
193 replication_factor,
194 optimizer_feature_overrides: _,
195 schedule,
196 }) => {
197 match &options.size {
198 Set(s) => size.clone_from(s),
199 Reset => coord_bail!("SIZE has no default value"),
200 Unchanged => {}
201 }
202 match &options.availability_zones {
203 Set(az) => availability_zones.clone_from(az),
204 Reset => *availability_zones = Default::default(),
205 Unchanged => {}
206 }
207 match &options.introspection_debugging {
208 Set(id) => logging.log_logging = *id,
209 Reset => logging.log_logging = false,
210 Unchanged => {}
211 }
212 match &options.introspection_interval {
213 Set(ii) => logging.interval = ii.0,
214 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
215 Unchanged => {}
216 }
217 match &options.replication_factor {
218 Set(rf) => *replication_factor = *rf,
219 Reset => {
220 *replication_factor = self
221 .catalog
222 .system_config()
223 .default_cluster_replication_factor()
224 }
225 Unchanged => {}
226 }
227 match &options.schedule {
228 Set(new_schedule) => {
229 *schedule = new_schedule.clone();
230 }
231 Reset => *schedule = Default::default(),
232 Unchanged => {}
233 }
234 if !matches!(options.replicas, Unchanged) {
235 coord_bail!("Cannot change REPLICAS of managed clusters");
236 }
237 }
238 Unmanaged => {
239 if !matches!(options.size, Unchanged) {
240 coord_bail!("Cannot change SIZE of unmanaged clusters");
241 }
242 if !matches!(options.availability_zones, Unchanged) {
243 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
244 }
245 if !matches!(options.introspection_debugging, Unchanged) {
246 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
247 }
248 if !matches!(options.introspection_interval, Unchanged) {
249 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
250 }
251 if !matches!(options.replication_factor, Unchanged) {
252 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
253 }
254 }
255 }
256
257 match &options.workload_class {
258 Set(wc) => new_config.workload_class.clone_from(wc),
259 Reset => new_config.workload_class = None,
260 Unchanged => {}
261 }
262
263 if new_config == config {
264 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
265 ObjectType::Cluster,
266 )));
267 }
268
269 match (&config.variant, &new_config.variant) {
270 (Managed(_), Managed(new_config_managed)) => {
271 let alter_followup = self
272 .sequence_alter_cluster_managed_to_managed(
273 Some(session),
274 cluster_id,
275 new_config.clone(),
276 ReplicaCreateDropReason::Manual,
277 strategy.clone(),
278 )
279 .await?;
280 if alter_followup == NeedsFinalization::Yes {
281 self.active_conns
284 .get_mut(session.conn_id())
285 .expect("There must be an active connection")
286 .pending_cluster_alters
287 .insert(cluster_id.clone());
288 let new_config_managed = new_config_managed.clone();
289 return match &strategy {
290 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
291 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
292 .into(),
293 )),
294 AlterClusterPlanStrategy::For(duration) => {
295 let span = Span::current();
296 let plan = plan.clone();
297 let duration = duration.clone().to_owned();
298 Ok(StageResult::Handle(mz_ore::task::spawn(
299 || "Finalize Alter Cluster",
300 async move {
301 tokio::time::sleep(duration).await;
302 let stage = ClusterStage::Finalize(AlterClusterFinalize {
303 validity,
304 plan,
305 new_config: new_config_managed,
306 });
307 Ok(Box::new(stage))
308 }
309 .instrument(span),
310 )))
311 }
312 AlterClusterPlanStrategy::UntilReady {
313 timeout,
314 on_timeout,
315 } => Ok(StageResult::Immediate(Box::new(
316 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
317 validity,
318 plan: plan.clone(),
319 new_config: new_config_managed.clone(),
320 timeout_time: Instant::now() + timeout.to_owned(),
321 on_timeout: on_timeout.to_owned(),
322 }),
323 ))),
324 };
325 }
326 }
327 (Unmanaged, Managed(_)) => {
328 self.sequence_alter_cluster_unmanaged_to_managed(
329 session,
330 cluster_id,
331 new_config,
332 options.to_owned(),
333 )
334 .await?;
335 }
336 (Managed(_), Unmanaged) => {
337 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
338 .await?;
339 }
340 (Unmanaged, Unmanaged) => {
341 self.sequence_alter_cluster_unmanaged_to_unmanaged(
342 session,
343 cluster_id,
344 new_config,
345 options.replicas.clone(),
346 )
347 .await?;
348 }
349 }
350
351 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
352 ObjectType::Cluster,
353 )))
354 }
355
356 async fn finalize_alter_cluster_stage(
357 &mut self,
358 session: &Session,
359 AlterClusterPlan {
360 id: cluster_id,
361 name: cluster_name,
362 ..
363 }: AlterClusterPlan,
364 new_config: ClusterVariantManaged,
365 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
366 let cluster = self.catalog.get_cluster(cluster_id);
367 let workload_class = cluster.config.workload_class.clone();
368 let mut ops = vec![];
369
370 let remove_replicas = cluster
373 .replicas()
374 .filter_map(|r| {
375 if !r.config.location.pending() && !r.config.location.internal() {
376 Some(catalog::DropObjectInfo::ClusterReplica((
377 cluster_id.clone(),
378 r.replica_id,
379 ReplicaCreateDropReason::Manual,
380 )))
381 } else {
382 None
383 }
384 })
385 .collect();
386 ops.push(catalog::Op::DropObjects(remove_replicas));
387
388 let finalize_replicas: Vec<catalog::Op> = cluster
391 .replicas()
392 .filter_map(|r| {
393 if r.config.location.pending() {
394 let cluster_ident = match Ident::new(cluster.name.clone()) {
395 Ok(id) => id,
396 Err(err) => {
397 return Some(Err(AdapterError::internal(
398 "Unexpected error parsing cluster name",
399 err,
400 )));
401 }
402 };
403 let replica_ident = match Ident::new(r.name.clone()) {
404 Ok(id) => id,
405 Err(err) => {
406 return Some(Err(AdapterError::internal(
407 "Unexpected error parsing replica name",
408 err,
409 )));
410 }
411 };
412 Some(Ok((cluster_ident, replica_ident, r)))
413 } else {
414 None
415 }
416 })
417 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
420 .into_iter()
421 .map(|(cluster_ident, replica_ident, replica)| {
422 let mut new_replica_config = replica.config.clone();
423 debug!("Promoting replica: {}", replica.name);
424 match new_replica_config.location {
425 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
426 ref mut pending,
427 ..
428 }) => {
429 *pending = false;
430 }
431 mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
432 }
433
434 let mut replica_ops = vec![];
435 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
436 if let Some(to_name) = to_name {
437 replica_ops.push(catalog::Op::RenameClusterReplica {
438 cluster_id: cluster_id.clone(),
439 replica_id: replica.replica_id.to_owned(),
440 name: QualifiedReplica {
441 cluster: cluster_ident,
442 replica: replica_ident,
443 },
444 to_name: to_name.to_owned(),
445 });
446 }
447 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
448 cluster_id,
449 replica_id: replica.replica_id.to_owned(),
450 config: new_replica_config,
451 });
452 replica_ops
453 })
454 .flatten()
455 .collect();
456
457 ops.extend(finalize_replicas);
458
459 ops.push(Op::UpdateClusterConfig {
461 id: cluster_id,
462 name: cluster_name,
463 config: ClusterConfig {
464 variant: ClusterVariant::Managed(new_config),
465 workload_class: workload_class.clone(),
466 },
467 });
468 self.catalog_transact(Some(session), ops).await?;
469 self.active_conns
472 .get_mut(session.conn_id())
473 .expect("There must be an active connection")
474 .pending_cluster_alters
475 .remove(&cluster_id);
476
477 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
478 ObjectType::Cluster,
479 )))
480 }
481
482 async fn check_if_pending_replicas_hydrated_stage(
483 &mut self,
484 session: &Session,
485 plan: AlterClusterPlan,
486 new_config: ClusterVariantManaged,
487 timeout_time: Instant,
488 on_timeout: OnTimeoutAction,
489 validity: PlanValidity,
490 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
491 let cluster = self.catalog.get_cluster(plan.id);
493 let pending_replicas = cluster
494 .replicas()
495 .filter_map(|r| {
496 if r.config.location.pending() {
497 Some(r.replica_id.clone())
498 } else {
499 None
500 }
501 })
502 .collect_vec();
503 if Instant::now() > timeout_time {
505 match on_timeout {
507 OnTimeoutAction::Rollback => {
508 self.active_conns
509 .get_mut(session.conn_id())
510 .expect("There must be an active connection")
511 .pending_cluster_alters
512 .remove(&cluster.id);
513 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
514 .await?;
515 return Err(AdapterError::AlterClusterTimeout);
516 }
517 OnTimeoutAction::Commit => {
518 let span = Span::current();
519 let poll_duration = self
520 .catalog
521 .system_config()
522 .cluster_alter_check_ready_interval()
523 .clone();
524 return Ok(StageResult::Handle(mz_ore::task::spawn(
525 || "Finalize Alter Cluster",
526 async move {
527 tokio::time::sleep(poll_duration).await;
528 let stage = ClusterStage::Finalize(AlterClusterFinalize {
529 validity,
530 plan,
531 new_config,
532 });
533 Ok(Box::new(stage))
534 }
535 .instrument(span),
536 )));
537 }
538 }
539 }
540 let compute_hydrated_fut = self
541 .controller
542 .compute
543 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
544 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
545
546 let storage_hydrated = self
547 .controller
548 .storage
549 .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
550 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
551
552 let span = Span::current();
553 Ok(StageResult::Handle(mz_ore::task::spawn(
554 || "Alter Cluster: wait for hydrated",
555 async move {
556 let compute_hydrated = compute_hydrated_fut
557 .await
558 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560 if compute_hydrated && storage_hydrated {
561 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
563 validity,
564 plan,
565 new_config,
566 })))
567 } else {
568 tokio::time::sleep(Duration::from_secs(1)).await;
570 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
571 validity,
572 plan,
573 new_config,
574 timeout_time,
575 on_timeout,
576 });
577 Ok(Box::new(stage))
578 }
579 }
580 .instrument(span),
581 )))
582 }
583
584 #[mz_ore::instrument(level = "debug")]
585 pub(crate) async fn sequence_create_cluster(
586 &mut self,
587 session: &Session,
588 CreateClusterPlan {
589 name,
590 variant,
591 workload_class,
592 }: CreateClusterPlan,
593 ) -> Result<ExecuteResponse, AdapterError> {
594 tracing::debug!("sequence_create_cluster");
595
596 let id_ts = self.get_catalog_write_ts().await;
597 let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
598 let introspection_sources = BUILTINS::logs().collect();
603 let cluster_variant = match &variant {
604 CreateClusterVariant::Managed(plan) => {
605 let logging = if let Some(config) = plan.compute.introspection {
606 ReplicaLogging {
607 log_logging: config.debugging,
608 interval: Some(config.interval),
609 }
610 } else {
611 ReplicaLogging::default()
612 };
613 ClusterVariant::Managed(ClusterVariantManaged {
614 size: plan.size.clone(),
615 availability_zones: plan.availability_zones.clone(),
616 logging,
617 replication_factor: plan.replication_factor,
618 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
619 schedule: plan.schedule.clone(),
620 })
621 }
622 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
623 };
624 let config = ClusterConfig {
625 variant: cluster_variant,
626 workload_class,
627 };
628 let ops = vec![catalog::Op::CreateCluster {
629 id,
630 name: name.clone(),
631 introspection_sources,
632 owner_id: *session.current_role_id(),
633 config,
634 }];
635
636 match variant {
637 CreateClusterVariant::Managed(plan) => {
638 self.sequence_create_managed_cluster(session, plan, id, ops)
639 .await
640 }
641 CreateClusterVariant::Unmanaged(plan) => {
642 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
643 .await
644 }
645 }
646 }
647
648 #[mz_ore::instrument(level = "debug")]
649 async fn sequence_create_managed_cluster(
650 &mut self,
651 session: &Session,
652 CreateClusterManagedPlan {
653 availability_zones,
654 compute,
655 replication_factor,
656 size,
657 optimizer_feature_overrides: _,
658 schedule: _,
659 }: CreateClusterManagedPlan,
660 cluster_id: ClusterId,
661 mut ops: Vec<catalog::Op>,
662 ) -> Result<ExecuteResponse, AdapterError> {
663 tracing::debug!("sequence_create_managed_cluster");
664
665 self.ensure_valid_azs(availability_zones.iter())?;
666
667 let role_id = session.role_metadata().current_role;
668 self.catalog.ensure_valid_replica_size(
669 &self
670 .catalog()
671 .get_role_allowed_cluster_sizes(&Some(role_id)),
672 &size,
673 )?;
674
675 if cluster_id.is_user() {
680 self.validate_resource_limit(
681 0,
682 i64::from(replication_factor),
683 SystemVars::max_replicas_per_cluster,
684 "cluster replica",
685 MAX_REPLICAS_PER_CLUSTER.name(),
686 )?;
687 }
688
689 for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
690 self.create_managed_cluster_replica_op(
691 cluster_id,
692 replica_name,
693 &compute,
694 &size,
695 &mut ops,
696 if availability_zones.is_empty() {
697 None
698 } else {
699 Some(availability_zones.as_ref())
700 },
701 false,
702 *session.current_role_id(),
703 ReplicaCreateDropReason::Manual,
704 )?;
705 }
706
707 self.catalog_transact(Some(session), ops).await?;
708
709 Ok(ExecuteResponse::CreatedCluster)
710 }
711
712 fn create_managed_cluster_replica_op(
713 &self,
714 cluster_id: ClusterId,
715 name: String,
716 compute: &mz_sql::plan::ComputeReplicaConfig,
717 size: &String,
718 ops: &mut Vec<Op>,
719 azs: Option<&[String]>,
720 pending: bool,
721 owner_id: RoleId,
722 reason: ReplicaCreateDropReason,
723 ) -> Result<(), AdapterError> {
724 let location = mz_catalog::durable::ReplicaLocation::Managed {
725 availability_zone: None,
726 billed_as: None,
727 internal: false,
728 size: size.clone(),
729 pending,
730 };
731
732 let logging = if let Some(config) = compute.introspection {
733 ReplicaLogging {
734 log_logging: config.debugging,
735 interval: Some(config.interval),
736 }
737 } else {
738 ReplicaLogging::default()
739 };
740
741 let config = ReplicaConfig {
742 location: self.catalog().concretize_replica_location(
743 location,
744 &self
745 .catalog()
746 .get_role_allowed_cluster_sizes(&Some(owner_id)),
747 azs,
748 )?,
749 compute: ComputeReplicaConfig { logging },
750 };
751
752 ops.push(catalog::Op::CreateClusterReplica {
753 cluster_id,
754 name,
755 config,
756 owner_id,
757 reason,
758 });
759 Ok(())
760 }
761
762 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
763 &self,
764 azs: I,
765 ) -> Result<(), AdapterError> {
766 let cat_azs = self.catalog().state().availability_zones();
767 for az in azs.into_iter() {
768 if !cat_azs.contains(az) {
769 return Err(AdapterError::InvalidClusterReplicaAz {
770 az: az.to_string(),
771 expected: cat_azs.to_vec(),
772 });
773 }
774 }
775 Ok(())
776 }
777
778 #[mz_ore::instrument(level = "debug")]
779 async fn sequence_create_unmanaged_cluster(
780 &mut self,
781 session: &Session,
782 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
783 id: ClusterId,
784 mut ops: Vec<catalog::Op>,
785 ) -> Result<ExecuteResponse, AdapterError> {
786 tracing::debug!("sequence_create_unmanaged_cluster");
787
788 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
789 if let mz_sql::plan::ReplicaConfig::Orchestrated {
790 availability_zone: Some(az),
791 ..
792 } = &r
793 {
794 Some(az)
795 } else {
796 None
797 }
798 }))?;
799
800 if id.is_user() {
805 self.validate_resource_limit(
806 0,
807 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
808 SystemVars::max_replicas_per_cluster,
809 "cluster replica",
810 MAX_REPLICAS_PER_CLUSTER.name(),
811 )?;
812 }
813
814 for (replica_name, replica_config) in replicas {
815 let (compute, location) = match replica_config {
818 mz_sql::plan::ReplicaConfig::Unorchestrated {
819 storagectl_addrs,
820 computectl_addrs,
821 compute,
822 } => {
823 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
824 storagectl_addrs,
825 computectl_addrs,
826 };
827 (compute, location)
828 }
829 mz_sql::plan::ReplicaConfig::Orchestrated {
830 availability_zone,
831 billed_as,
832 compute,
833 internal,
834 size,
835 } => {
836 if !session.user().is_internal() && (internal || billed_as.is_some()) {
838 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
839 }
840 if billed_as.is_some() && !internal {
842 coord_bail!("must specify INTERNAL when specifying BILLED AS");
843 }
844
845 let location = mz_catalog::durable::ReplicaLocation::Managed {
846 availability_zone,
847 billed_as,
848 internal,
849 size: size.clone(),
850 pending: false,
851 };
852 (compute, location)
853 }
854 };
855
856 let logging = if let Some(config) = compute.introspection {
857 ReplicaLogging {
858 log_logging: config.debugging,
859 interval: Some(config.interval),
860 }
861 } else {
862 ReplicaLogging::default()
863 };
864
865 let role_id = session.role_metadata().current_role;
866 let config = ReplicaConfig {
867 location: self.catalog().concretize_replica_location(
868 location,
869 &self
870 .catalog()
871 .get_role_allowed_cluster_sizes(&Some(role_id)),
872 None,
873 )?,
874 compute: ComputeReplicaConfig { logging },
875 };
876
877 ops.push(catalog::Op::CreateClusterReplica {
878 cluster_id: id,
879 name: replica_name.clone(),
880 config,
881 owner_id: *session.current_role_id(),
882 reason: ReplicaCreateDropReason::Manual,
883 });
884 }
885
886 self.catalog_transact(Some(session), ops).await?;
887
888 Ok(ExecuteResponse::CreatedCluster)
889 }
890
891 #[mz_ore::instrument(level = "debug")]
892 pub(crate) async fn sequence_create_cluster_replica(
893 &mut self,
894 session: &Session,
895 CreateClusterReplicaPlan {
896 name,
897 cluster_id,
898 config,
899 }: CreateClusterReplicaPlan,
900 ) -> Result<ExecuteResponse, AdapterError> {
901 let (compute, location) = match config {
903 mz_sql::plan::ReplicaConfig::Unorchestrated {
904 storagectl_addrs,
905 computectl_addrs,
906 compute,
907 } => {
908 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
909 storagectl_addrs,
910 computectl_addrs,
911 };
912 (compute, location)
913 }
914 mz_sql::plan::ReplicaConfig::Orchestrated {
915 availability_zone,
916 billed_as,
917 compute,
918 internal,
919 size,
920 } => {
921 let availability_zone = match availability_zone {
922 Some(az) => {
923 self.ensure_valid_azs([&az])?;
924 Some(az)
925 }
926 None => None,
927 };
928 let location = mz_catalog::durable::ReplicaLocation::Managed {
929 availability_zone,
930 billed_as,
931 internal,
932 size,
933 pending: false,
934 };
935 (compute, location)
936 }
937 };
938
939 let logging = if let Some(config) = compute.introspection {
940 ReplicaLogging {
941 log_logging: config.debugging,
942 interval: Some(config.interval),
943 }
944 } else {
945 ReplicaLogging::default()
946 };
947
948 let role_id = session.role_metadata().current_role;
949 let config = ReplicaConfig {
950 location: self.catalog().concretize_replica_location(
951 location,
952 &self
953 .catalog()
954 .get_role_allowed_cluster_sizes(&Some(role_id)),
955 None,
958 )?,
959 compute: ComputeReplicaConfig { logging },
960 };
961
962 let cluster = self.catalog().get_cluster(cluster_id);
963
964 if let ReplicaLocation::Managed(ManagedReplicaLocation {
965 internal,
966 billed_as,
967 ..
968 }) = &config.location
969 {
970 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
972 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
973 }
974 if cluster.is_managed() && !*internal {
976 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
977 }
978 if billed_as.is_some() && !*internal {
980 coord_bail!("must specify INTERNAL when specifying BILLED AS");
981 }
982 }
983
984 let owner_id = cluster.owner_id();
986 let op = catalog::Op::CreateClusterReplica {
987 cluster_id,
988 name: name.clone(),
989 config,
990 owner_id,
991 reason: ReplicaCreateDropReason::Manual,
992 };
993
994 self.catalog_transact(Some(session), vec![op]).await?;
995
996 Ok(ExecuteResponse::CreatedClusterReplica)
997 }
998
999 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1008 &mut self,
1009 session: Option<&Session>,
1010 cluster_id: ClusterId,
1011 new_config: ClusterConfig,
1012 reason: ReplicaCreateDropReason,
1013 strategy: AlterClusterPlanStrategy,
1014 ) -> Result<NeedsFinalization, AdapterError> {
1015 let cluster = self.catalog.get_cluster(cluster_id);
1016 let name = cluster.name().to_string();
1017 let owner_id = cluster.owner_id();
1018
1019 let mut ops = vec![];
1020 let mut finalization_needed = NeedsFinalization::No;
1021
1022 let ClusterVariant::Managed(ClusterVariantManaged {
1023 size,
1024 availability_zones,
1025 logging,
1026 replication_factor,
1027 optimizer_feature_overrides: _,
1028 schedule: _,
1029 }) = &cluster.config.variant
1030 else {
1031 panic!("expected existing managed cluster config");
1032 };
1033 let ClusterVariant::Managed(ClusterVariantManaged {
1034 size: new_size,
1035 replication_factor: new_replication_factor,
1036 availability_zones: new_availability_zones,
1037 logging: new_logging,
1038 optimizer_feature_overrides: _,
1039 schedule: _,
1040 }) = &new_config.variant
1041 else {
1042 panic!("expected new managed cluster config");
1043 };
1044
1045 let role_id = session.map(|s| s.role_metadata().current_role);
1046 self.catalog.ensure_valid_replica_size(
1047 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1048 new_size,
1049 )?;
1050
1051 if cluster.replicas().any(|r| r.config.location.pending()) {
1053 return Err(AlterClusterWhilePendingReplicas);
1054 }
1055
1056 let compute = mz_sql::plan::ComputeReplicaConfig {
1057 introspection: new_logging
1058 .interval
1059 .map(|interval| ComputeReplicaIntrospectionConfig {
1060 debugging: new_logging.log_logging,
1061 interval,
1062 }),
1063 };
1064
1065 if new_replication_factor > replication_factor {
1070 if cluster_id.is_user() {
1071 self.validate_resource_limit(
1072 usize::cast_from(*replication_factor),
1073 i64::from(*new_replication_factor) - i64::from(*replication_factor),
1074 SystemVars::max_replicas_per_cluster,
1075 "cluster replica",
1076 MAX_REPLICAS_PER_CLUSTER.name(),
1077 )?;
1078 }
1079 }
1080
1081 if new_size != size
1082 || new_availability_zones != availability_zones
1083 || new_logging != logging
1084 {
1085 self.ensure_valid_azs(new_availability_zones.iter())?;
1086 match strategy {
1090 AlterClusterPlanStrategy::None => {
1091 let replica_ids_and_reasons = (0..*replication_factor)
1092 .map(managed_cluster_replica_name)
1093 .filter_map(|name| cluster.replica_id(&name))
1094 .map(|replica_id| {
1095 catalog::DropObjectInfo::ClusterReplica((
1096 cluster.id(),
1097 replica_id,
1098 reason.clone(),
1099 ))
1100 })
1101 .collect();
1102 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1103 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1104 self.create_managed_cluster_replica_op(
1105 cluster_id,
1106 name.clone(),
1107 &compute,
1108 new_size,
1109 &mut ops,
1110 Some(new_availability_zones.as_ref()),
1111 false,
1112 owner_id,
1113 reason.clone(),
1114 )?;
1115 }
1116 }
1117 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1118 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1119 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1120 self.create_managed_cluster_replica_op(
1121 cluster_id,
1122 name.clone(),
1123 &compute,
1124 new_size,
1125 &mut ops,
1126 Some(new_availability_zones.as_ref()),
1127 true,
1128 owner_id,
1129 reason.clone(),
1130 )?;
1131 }
1132 finalization_needed = NeedsFinalization::Yes;
1133 }
1134 }
1135 } else if new_replication_factor < replication_factor {
1136 let replica_ids = (*new_replication_factor..*replication_factor)
1138 .map(managed_cluster_replica_name)
1139 .filter_map(|name| cluster.replica_id(&name))
1140 .map(|replica_id| {
1141 catalog::DropObjectInfo::ClusterReplica((
1142 cluster.id(),
1143 replica_id,
1144 reason.clone(),
1145 ))
1146 })
1147 .collect();
1148 ops.push(catalog::Op::DropObjects(replica_ids));
1149 } else if new_replication_factor > replication_factor {
1150 for name in
1152 (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1153 {
1154 self.create_managed_cluster_replica_op(
1155 cluster_id,
1156 name.clone(),
1157 &compute,
1158 new_size,
1159 &mut ops,
1160 Some(new_availability_zones.as_ref()),
1163 false,
1164 owner_id,
1165 reason.clone(),
1166 )?;
1167 }
1168 }
1169
1170 match finalization_needed {
1173 NeedsFinalization::No => {
1174 ops.push(catalog::Op::UpdateClusterConfig {
1175 id: cluster_id,
1176 name: name.clone(),
1177 config: new_config,
1178 });
1179 }
1180 NeedsFinalization::Yes => {}
1181 }
1182 self.catalog_transact(session, ops).await?;
1183 Ok(finalization_needed)
1184 }
1185
1186 async fn sequence_alter_cluster_unmanaged_to_managed(
1190 &mut self,
1191 session: &Session,
1192 cluster_id: ClusterId,
1193 mut new_config: ClusterConfig,
1194 options: PlanClusterOption,
1195 ) -> Result<(), AdapterError> {
1196 let cluster = self.catalog.get_cluster(cluster_id);
1197 let cluster_name = cluster.name().to_string();
1198
1199 let ClusterVariant::Managed(ClusterVariantManaged {
1200 size: new_size,
1201 replication_factor: new_replication_factor,
1202 availability_zones: new_availability_zones,
1203 logging: _,
1204 optimizer_feature_overrides: _,
1205 schedule: _,
1206 }) = &mut new_config.variant
1207 else {
1208 panic!("expected new managed cluster config");
1209 };
1210
1211 let user_replica_count = cluster
1213 .user_replicas()
1214 .count()
1215 .try_into()
1216 .expect("must_fit");
1217 match options.replication_factor {
1218 AlterOptionParameter::Set(_) => {
1219 if user_replica_count != *new_replication_factor {
1221 coord_bail!(
1222 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1223 );
1224 }
1225 }
1226 _ => {
1227 *new_replication_factor = user_replica_count;
1228 }
1229 }
1230
1231 let mut names = BTreeSet::new();
1232 let mut sizes = BTreeSet::new();
1233
1234 self.ensure_valid_azs(new_availability_zones.iter())?;
1235
1236 for replica in cluster.user_replicas() {
1238 names.insert(replica.name.clone());
1239 match &replica.config.location {
1240 ReplicaLocation::Unmanaged(_) => coord_bail!(
1241 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1242 ),
1243 ReplicaLocation::Managed(location) => {
1244 sizes.insert(location.size.clone());
1245
1246 if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1247 &location.availability_zones
1248 {
1249 if !new_availability_zones.contains(az) {
1250 coord_bail!(
1251 "unmanaged replica has availability zone {az} which is not \
1252 in managed {new_availability_zones:?}"
1253 )
1254 }
1255 }
1256 }
1257 }
1258 }
1259
1260 if sizes.is_empty() {
1261 assert!(
1262 cluster.user_replicas().next().is_none(),
1263 "Cluster should not have replicas"
1264 );
1265 match &options.size {
1267 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1268 coord_bail!("Missing SIZE for empty cluster")
1269 }
1270 AlterOptionParameter::Set(_) => {} }
1272 } else if sizes.len() == 1 {
1273 let size = sizes.into_iter().next().expect("must exist");
1274 match &options.size {
1275 AlterOptionParameter::Set(sz) if *sz != size => {
1276 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1277 }
1278 _ => *new_size = size,
1279 }
1280 } else {
1281 let formatted = sizes
1282 .iter()
1283 .map(String::as_str)
1284 .collect::<Vec<_>>()
1285 .join(", ");
1286 coord_bail!(
1287 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1288 );
1289 }
1290
1291 for i in 0..*new_replication_factor {
1292 let name = managed_cluster_replica_name(i);
1293 names.remove(&name);
1294 }
1295 if !names.is_empty() {
1296 let formatted = names
1297 .iter()
1298 .map(String::as_str)
1299 .collect::<Vec<_>>()
1300 .join(", ");
1301 coord_bail!(
1302 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1303 );
1304 }
1305
1306 let ops = vec![catalog::Op::UpdateClusterConfig {
1307 id: cluster_id,
1308 name: cluster_name,
1309 config: new_config,
1310 }];
1311
1312 self.catalog_transact(Some(session), ops).await?;
1313 Ok(())
1314 }
1315
1316 async fn sequence_alter_cluster_managed_to_unmanaged(
1317 &mut self,
1318 session: &Session,
1319 cluster_id: ClusterId,
1320 new_config: ClusterConfig,
1321 ) -> Result<(), AdapterError> {
1322 let cluster = self.catalog().get_cluster(cluster_id);
1323
1324 let ops = vec![catalog::Op::UpdateClusterConfig {
1325 id: cluster_id,
1326 name: cluster.name().to_string(),
1327 config: new_config,
1328 }];
1329
1330 self.catalog_transact(Some(session), ops).await?;
1331 Ok(())
1332 }
1333
1334 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1335 &mut self,
1336 session: &Session,
1337 cluster_id: ClusterId,
1338 new_config: ClusterConfig,
1339 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1340 ) -> Result<(), AdapterError> {
1341 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1342 coord_bail!("Cannot alter replicas in unmanaged cluster");
1343 }
1344
1345 let cluster = self.catalog().get_cluster(cluster_id);
1346
1347 let ops = vec![catalog::Op::UpdateClusterConfig {
1348 id: cluster_id,
1349 name: cluster.name().to_string(),
1350 config: new_config,
1351 }];
1352
1353 self.catalog_transact(Some(session), ops).await?;
1354 Ok(())
1355 }
1356
1357 pub(crate) async fn sequence_alter_cluster_rename(
1358 &mut self,
1359 ctx: &mut ExecuteContext,
1360 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1361 ) -> Result<ExecuteResponse, AdapterError> {
1362 let op = Op::RenameCluster {
1363 id,
1364 name,
1365 to_name,
1366 check_reserved_names: true,
1367 };
1368 match self
1369 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1370 .await
1371 {
1372 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1373 Err(err) => Err(err),
1374 }
1375 }
1376
1377 pub(crate) async fn sequence_alter_cluster_swap(
1378 &mut self,
1379 ctx: &mut ExecuteContext,
1380 AlterClusterSwapPlan {
1381 id_a,
1382 id_b,
1383 name_a,
1384 name_b,
1385 name_temp,
1386 }: AlterClusterSwapPlan,
1387 ) -> Result<ExecuteResponse, AdapterError> {
1388 let op_a = Op::RenameCluster {
1389 id: id_a,
1390 name: name_a.clone(),
1391 to_name: name_temp.clone(),
1392 check_reserved_names: false,
1393 };
1394 let op_b = Op::RenameCluster {
1395 id: id_b,
1396 name: name_b.clone(),
1397 to_name: name_a,
1398 check_reserved_names: false,
1399 };
1400 let op_temp = Op::RenameCluster {
1401 id: id_a,
1402 name: name_temp,
1403 to_name: name_b,
1404 check_reserved_names: false,
1405 };
1406
1407 match self
1408 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1409 Box::pin(async {})
1410 })
1411 .await
1412 {
1413 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1414 Err(err) => Err(err),
1415 }
1416 }
1417
1418 pub(crate) async fn sequence_alter_cluster_replica_rename(
1419 &mut self,
1420 session: &Session,
1421 AlterClusterReplicaRenamePlan {
1422 cluster_id,
1423 replica_id,
1424 name,
1425 to_name,
1426 }: AlterClusterReplicaRenamePlan,
1427 ) -> Result<ExecuteResponse, AdapterError> {
1428 let op = catalog::Op::RenameClusterReplica {
1429 cluster_id,
1430 replica_id,
1431 name,
1432 to_name,
1433 };
1434 match self.catalog_transact(Some(session), vec![op]).await {
1435 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1436 Err(err) => Err(err),
1437 }
1438 }
1439
1440 pub(crate) async fn sequence_alter_set_cluster(
1442 &self,
1443 _session: &Session,
1444 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1445 ) -> Result<ExecuteResponse, AdapterError> {
1446 async {}.await;
1450 let entry = self.catalog().get_entry(&id);
1451 match entry.item().typ() {
1452 _ => {
1453 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1455 }
1456 }
1457 }
1458}
1459
1460fn managed_cluster_replica_name(index: u32) -> String {
1461 format!("r{}", index + 1)
1462}
1463
1464#[derive(PartialEq)]
1467pub(crate) enum NeedsFinalization {
1468 Yes,
1470 No,
1471}