1use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
22 ReplicaLogging,
23};
24use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL};
25use mz_ore::cast::CastFrom;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46 Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53 type Ctx = ExecuteContext;
54
55 fn validity(&mut self) -> &mut PlanValidity {
56 match self {
57 Self::Alter(stage) => &mut stage.validity,
58 Self::WaitForHydrated(stage) => &mut stage.validity,
59 Self::Finalize(stage) => &mut stage.validity,
60 }
61 }
62
63 async fn stage(
64 self,
65 coord: &mut Coordinator,
66 ctx: &mut ExecuteContext,
67 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68 match self {
69 Self::Alter(stage) => {
70 coord
71 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72 .await
73 }
74 Self::WaitForHydrated(stage) => {
75 let AlterClusterWaitForHydrated {
76 validity,
77 plan,
78 new_config,
79 workload_class,
80 timeout_time,
81 on_timeout,
82 } = stage;
83 coord
84 .check_if_pending_replicas_hydrated_stage(
85 ctx.session(),
86 plan,
87 new_config,
88 workload_class,
89 timeout_time,
90 on_timeout,
91 validity,
92 )
93 .await
94 }
95 Self::Finalize(stage) => {
96 coord
97 .finalize_alter_cluster_stage(
98 ctx.session(),
99 stage.plan.clone(),
100 stage.new_config.clone(),
101 stage.workload_class.clone(),
102 )
103 .await
104 }
105 }
106 }
107
108 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
109 Message::ClusterStageReady {
110 ctx,
111 span,
112 stage: self,
113 }
114 }
115
116 fn cancel_enabled(&self) -> bool {
117 true
118 }
119}
120
121impl Coordinator {
122 #[instrument]
123 pub(crate) async fn sequence_alter_cluster_staged(
124 &mut self,
125 ctx: ExecuteContext,
126 plan: plan::AlterClusterPlan,
127 ) {
128 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
129 self.sequence_staged(ctx, Span::current(), stage).await;
130 }
131
132 #[instrument]
133 async fn alter_cluster_validate(
134 &self,
135 session: &Session,
136 plan: plan::AlterClusterPlan,
137 ) -> Result<ClusterStage, AdapterError> {
138 let validity = PlanValidity::new(
139 self.catalog().transient_revision(),
140 BTreeSet::new(),
141 Some(plan.id.clone()),
142 None,
143 session.role_metadata().clone(),
144 );
145 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
146 }
147
148 async fn sequence_alter_cluster_stage(
149 &mut self,
150 session: &Session,
151 plan: plan::AlterClusterPlan,
152 validity: PlanValidity,
153 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
154 let AlterClusterPlan {
155 id: cluster_id,
156 name: _,
157 ref options,
158 ref strategy,
159 } = plan;
160
161 use mz_catalog::memory::objects::ClusterVariant::*;
162 use mz_sql::plan::AlterOptionParameter::*;
163 let cluster = self.catalog.get_cluster(cluster_id);
164 let config = cluster.config.clone();
165 let mut new_config = config.clone();
166
167 match (&new_config.variant, &options.managed) {
168 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
169 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
170 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
171 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
172 let size = "".to_string();
176 let logging = ReplicaLogging {
177 log_logging: false,
178 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
179 };
180 new_config.variant = Managed(ClusterVariantManaged {
181 size,
182 availability_zones: Default::default(),
183 logging,
184 replication_factor: 1,
185 optimizer_feature_overrides: Default::default(),
186 schedule: Default::default(),
187 });
188 }
189 }
190
191 match &mut new_config.variant {
192 Managed(ClusterVariantManaged {
193 size,
194 availability_zones,
195 logging,
196 replication_factor,
197 optimizer_feature_overrides: _,
198 schedule,
199 }) => {
200 match &options.size {
201 Set(s) => size.clone_from(s),
202 Reset => coord_bail!("SIZE has no default value"),
203 Unchanged => {}
204 }
205 match &options.availability_zones {
206 Set(az) => availability_zones.clone_from(az),
207 Reset => *availability_zones = Default::default(),
208 Unchanged => {}
209 }
210 match &options.introspection_debugging {
211 Set(id) => logging.log_logging = *id,
212 Reset => logging.log_logging = false,
213 Unchanged => {}
214 }
215 match &options.introspection_interval {
216 Set(ii) => logging.interval = ii.0,
217 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
218 Unchanged => {}
219 }
220 match &options.replication_factor {
221 Set(rf) => *replication_factor = *rf,
222 Reset => {
223 *replication_factor = self
224 .catalog
225 .system_config()
226 .default_cluster_replication_factor()
227 }
228 Unchanged => {}
229 }
230 match &options.schedule {
231 Set(new_schedule) => {
232 *schedule = new_schedule.clone();
233 }
234 Reset => *schedule = Default::default(),
235 Unchanged => {}
236 }
237 if !matches!(options.replicas, Unchanged) {
238 coord_bail!("Cannot change REPLICAS of managed clusters");
239 }
240 }
241 Unmanaged => {
242 if !matches!(options.size, Unchanged) {
243 coord_bail!("Cannot change SIZE of unmanaged clusters");
244 }
245 if !matches!(options.availability_zones, Unchanged) {
246 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
247 }
248 if !matches!(options.introspection_debugging, Unchanged) {
249 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
250 }
251 if !matches!(options.introspection_interval, Unchanged) {
252 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
253 }
254 if !matches!(options.replication_factor, Unchanged) {
255 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
256 }
257 }
258 }
259
260 match &options.workload_class {
261 Set(wc) => new_config.workload_class.clone_from(wc),
262 Reset => new_config.workload_class = None,
263 Unchanged => {}
264 }
265
266 if new_config == config {
267 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
268 ObjectType::Cluster,
269 )));
270 }
271
272 match (&config.variant, &new_config.variant) {
273 (Managed(_), Managed(new_config_managed)) => {
274 let alter_followup = self
275 .sequence_alter_cluster_managed_to_managed(
276 Some(session),
277 cluster_id,
278 new_config.clone(),
279 ReplicaCreateDropReason::Manual,
280 strategy.clone(),
281 )
282 .await?;
283 if alter_followup == NeedsFinalization::Yes {
284 self.active_conns
287 .get_mut(session.conn_id())
288 .expect("There must be an active connection")
289 .pending_cluster_alters
290 .insert(cluster_id.clone());
291 let new_config_managed = new_config_managed.clone();
292 return match &strategy {
293 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
294 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
295 .into(),
296 )),
297 AlterClusterPlanStrategy::For(duration) => {
298 let span = Span::current();
299 let plan = plan.clone();
300 let duration = duration.clone().to_owned();
301 let workload_class = new_config.workload_class.clone();
302 Ok(StageResult::Handle(mz_ore::task::spawn(
303 || "Finalize Alter Cluster",
304 async move {
305 tokio::time::sleep(duration).await;
306 let stage = ClusterStage::Finalize(AlterClusterFinalize {
307 validity,
308 plan,
309 new_config: new_config_managed,
310 workload_class,
311 });
312 Ok(Box::new(stage))
313 }
314 .instrument(span),
315 )))
316 }
317 AlterClusterPlanStrategy::UntilReady {
318 timeout,
319 on_timeout,
320 } => Ok(StageResult::Immediate(Box::new(
321 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
322 validity,
323 plan: plan.clone(),
324 new_config: new_config_managed.clone(),
325 workload_class: new_config.workload_class.clone(),
326 timeout_time: Instant::now() + timeout.to_owned(),
327 on_timeout: on_timeout.to_owned(),
328 }),
329 ))),
330 };
331 }
332 }
333 (Unmanaged, Managed(_)) => {
334 self.sequence_alter_cluster_unmanaged_to_managed(
335 session,
336 cluster_id,
337 new_config,
338 options.to_owned(),
339 )
340 .await?;
341 }
342 (Managed(_), Unmanaged) => {
343 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
344 .await?;
345 }
346 (Unmanaged, Unmanaged) => {
347 self.sequence_alter_cluster_unmanaged_to_unmanaged(
348 session,
349 cluster_id,
350 new_config,
351 options.replicas.clone(),
352 )
353 .await?;
354 }
355 }
356
357 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
358 ObjectType::Cluster,
359 )))
360 }
361
362 async fn finalize_alter_cluster_stage(
363 &mut self,
364 session: &Session,
365 AlterClusterPlan {
366 id: cluster_id,
367 name: cluster_name,
368 ..
369 }: AlterClusterPlan,
370 new_config: ClusterVariantManaged,
371 workload_class: Option<String>,
372 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
373 let cluster = self.catalog.get_cluster(cluster_id);
374 let mut ops = vec![];
375
376 let remove_replicas = cluster
379 .replicas()
380 .filter_map(|r| {
381 if !r.config.location.pending() && !r.config.location.internal() {
382 Some(catalog::DropObjectInfo::ClusterReplica((
383 cluster_id.clone(),
384 r.replica_id,
385 ReplicaCreateDropReason::Manual,
386 )))
387 } else {
388 None
389 }
390 })
391 .collect();
392 ops.push(catalog::Op::DropObjects(remove_replicas));
393
394 let finalize_replicas: Vec<catalog::Op> = cluster
397 .replicas()
398 .filter_map(|r| {
399 if r.config.location.pending() {
400 let cluster_ident = match Ident::new(cluster.name.clone()) {
401 Ok(id) => id,
402 Err(err) => {
403 return Some(Err(AdapterError::internal(
404 "Unexpected error parsing cluster name",
405 err,
406 )));
407 }
408 };
409 let replica_ident = match Ident::new(r.name.clone()) {
410 Ok(id) => id,
411 Err(err) => {
412 return Some(Err(AdapterError::internal(
413 "Unexpected error parsing replica name",
414 err,
415 )));
416 }
417 };
418 Some(Ok((cluster_ident, replica_ident, r)))
419 } else {
420 None
421 }
422 })
423 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
426 .into_iter()
427 .map(|(cluster_ident, replica_ident, replica)| {
428 let mut new_replica_config = replica.config.clone();
429 debug!("Promoting replica: {}", replica.name);
430 match new_replica_config.location {
431 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
432 ref mut pending,
433 ..
434 }) => {
435 *pending = false;
436 }
437 mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
438 }
439
440 let mut replica_ops = vec![];
441 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
442 if let Some(to_name) = to_name {
443 replica_ops.push(catalog::Op::RenameClusterReplica {
444 cluster_id: cluster_id.clone(),
445 replica_id: replica.replica_id.to_owned(),
446 name: QualifiedReplica {
447 cluster: cluster_ident,
448 replica: replica_ident,
449 },
450 to_name: to_name.to_owned(),
451 });
452 }
453 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
454 cluster_id,
455 replica_id: replica.replica_id.to_owned(),
456 config: new_replica_config,
457 });
458 replica_ops
459 })
460 .flatten()
461 .collect();
462
463 ops.extend(finalize_replicas);
464
465 ops.push(Op::UpdateClusterConfig {
467 id: cluster_id,
468 name: cluster_name,
469 config: ClusterConfig {
470 variant: ClusterVariant::Managed(new_config),
471 workload_class: workload_class.clone(),
472 },
473 });
474 self.catalog_transact(Some(session), ops).await?;
475 self.active_conns
478 .get_mut(session.conn_id())
479 .expect("There must be an active connection")
480 .pending_cluster_alters
481 .remove(&cluster_id);
482
483 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
484 ObjectType::Cluster,
485 )))
486 }
487
488 async fn check_if_pending_replicas_hydrated_stage(
489 &mut self,
490 session: &Session,
491 plan: AlterClusterPlan,
492 new_config: ClusterVariantManaged,
493 workload_class: Option<String>,
494 timeout_time: Instant,
495 on_timeout: OnTimeoutAction,
496 validity: PlanValidity,
497 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
498 let cluster = self.catalog.get_cluster(plan.id);
500 let pending_replicas = cluster
501 .replicas()
502 .filter_map(|r| {
503 if r.config.location.pending() {
504 Some(r.replica_id.clone())
505 } else {
506 None
507 }
508 })
509 .collect_vec();
510 if Instant::now() > timeout_time {
512 match on_timeout {
514 OnTimeoutAction::Rollback => {
515 self.active_conns
516 .get_mut(session.conn_id())
517 .expect("There must be an active connection")
518 .pending_cluster_alters
519 .remove(&cluster.id);
520 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
521 .await?;
522 return Err(AdapterError::AlterClusterTimeout);
523 }
524 OnTimeoutAction::Commit => {
525 let span = Span::current();
526 let poll_duration = self
527 .catalog
528 .system_config()
529 .cluster_alter_check_ready_interval()
530 .clone();
531 return Ok(StageResult::Handle(mz_ore::task::spawn(
532 || "Finalize Alter Cluster",
533 async move {
534 tokio::time::sleep(poll_duration).await;
535 let stage = ClusterStage::Finalize(AlterClusterFinalize {
536 validity,
537 plan,
538 new_config,
539 workload_class,
540 });
541 Ok(Box::new(stage))
542 }
543 .instrument(span),
544 )));
545 }
546 }
547 }
548 let compute_hydrated_fut = self
549 .controller
550 .compute
551 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
552 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
553
554 let storage_hydrated = self
555 .controller
556 .storage
557 .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
558 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560 let span = Span::current();
561 Ok(StageResult::Handle(mz_ore::task::spawn(
562 || "Alter Cluster: wait for hydrated",
563 async move {
564 let compute_hydrated = compute_hydrated_fut
565 .await
566 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
567
568 if compute_hydrated && storage_hydrated {
569 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
571 validity,
572 plan,
573 new_config: new_config.clone(),
574 workload_class: workload_class.clone(),
575 })))
576 } else {
577 tokio::time::sleep(Duration::from_secs(1)).await;
579 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
580 validity,
581 plan,
582 new_config,
583 workload_class,
584 timeout_time,
585 on_timeout,
586 });
587 Ok(Box::new(stage))
588 }
589 }
590 .instrument(span),
591 )))
592 }
593
594 #[mz_ore::instrument(level = "debug")]
595 pub(crate) async fn sequence_create_cluster(
596 &mut self,
597 session: &Session,
598 CreateClusterPlan {
599 name,
600 variant,
601 workload_class,
602 }: CreateClusterPlan,
603 ) -> Result<ExecuteResponse, AdapterError> {
604 tracing::debug!("sequence_create_cluster");
605
606 let id_ts = self.get_catalog_write_ts().await;
607 let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
608 let introspection_sources = BUILTINS::logs().collect();
613 let cluster_variant = match &variant {
614 CreateClusterVariant::Managed(plan) => {
615 let logging = if let Some(config) = plan.compute.introspection {
616 ReplicaLogging {
617 log_logging: config.debugging,
618 interval: Some(config.interval),
619 }
620 } else {
621 ReplicaLogging::default()
622 };
623 ClusterVariant::Managed(ClusterVariantManaged {
624 size: plan.size.clone(),
625 availability_zones: plan.availability_zones.clone(),
626 logging,
627 replication_factor: plan.replication_factor,
628 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
629 schedule: plan.schedule.clone(),
630 })
631 }
632 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
633 };
634 let config = ClusterConfig {
635 variant: cluster_variant,
636 workload_class,
637 };
638 let ops = vec![catalog::Op::CreateCluster {
639 id,
640 name: name.clone(),
641 introspection_sources,
642 owner_id: *session.current_role_id(),
643 config,
644 }];
645
646 match variant {
647 CreateClusterVariant::Managed(plan) => {
648 self.sequence_create_managed_cluster(session, plan, id, ops)
649 .await
650 }
651 CreateClusterVariant::Unmanaged(plan) => {
652 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
653 .await
654 }
655 }
656 }
657
658 #[mz_ore::instrument(level = "debug")]
659 async fn sequence_create_managed_cluster(
660 &mut self,
661 session: &Session,
662 CreateClusterManagedPlan {
663 availability_zones,
664 compute,
665 replication_factor,
666 size,
667 optimizer_feature_overrides: _,
668 schedule: _,
669 }: CreateClusterManagedPlan,
670 cluster_id: ClusterId,
671 mut ops: Vec<catalog::Op>,
672 ) -> Result<ExecuteResponse, AdapterError> {
673 tracing::debug!("sequence_create_managed_cluster");
674
675 self.ensure_valid_azs(availability_zones.iter())?;
676
677 let role_id = session.role_metadata().current_role;
678 self.catalog.ensure_valid_replica_size(
679 &self
680 .catalog()
681 .get_role_allowed_cluster_sizes(&Some(role_id)),
682 &size,
683 )?;
684
685 if cluster_id.is_user() {
690 self.validate_resource_limit(
691 0,
692 i64::from(replication_factor),
693 SystemVars::max_replicas_per_cluster,
694 "cluster replica",
695 MAX_REPLICAS_PER_CLUSTER.name(),
696 )?;
697 }
698
699 for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
700 self.create_managed_cluster_replica_op(
701 cluster_id,
702 replica_name,
703 &compute,
704 &size,
705 &mut ops,
706 if availability_zones.is_empty() {
707 None
708 } else {
709 Some(availability_zones.as_ref())
710 },
711 false,
712 *session.current_role_id(),
713 ReplicaCreateDropReason::Manual,
714 )?;
715 }
716
717 self.catalog_transact(Some(session), ops).await?;
718
719 Ok(ExecuteResponse::CreatedCluster)
720 }
721
722 fn create_managed_cluster_replica_op(
723 &self,
724 cluster_id: ClusterId,
725 name: String,
726 compute: &mz_sql::plan::ComputeReplicaConfig,
727 size: &String,
728 ops: &mut Vec<Op>,
729 azs: Option<&[String]>,
730 pending: bool,
731 owner_id: RoleId,
732 reason: ReplicaCreateDropReason,
733 ) -> Result<(), AdapterError> {
734 let location = mz_catalog::durable::ReplicaLocation::Managed {
735 availability_zone: None,
736 billed_as: None,
737 internal: false,
738 size: size.clone(),
739 pending,
740 };
741
742 let logging = if let Some(config) = compute.introspection {
743 ReplicaLogging {
744 log_logging: config.debugging,
745 interval: Some(config.interval),
746 }
747 } else {
748 ReplicaLogging::default()
749 };
750
751 let config = ReplicaConfig {
752 location: self.catalog().concretize_replica_location(
753 location,
754 &self
755 .catalog()
756 .get_role_allowed_cluster_sizes(&Some(owner_id)),
757 azs,
758 )?,
759 compute: ComputeReplicaConfig { logging },
760 };
761
762 ops.push(catalog::Op::CreateClusterReplica {
763 cluster_id,
764 name,
765 config,
766 owner_id,
767 reason,
768 });
769 Ok(())
770 }
771
772 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
773 &self,
774 azs: I,
775 ) -> Result<(), AdapterError> {
776 let cat_azs = self.catalog().state().availability_zones();
777 for az in azs.into_iter() {
778 if !cat_azs.contains(az) {
779 return Err(AdapterError::InvalidClusterReplicaAz {
780 az: az.to_string(),
781 expected: cat_azs.to_vec(),
782 });
783 }
784 }
785 Ok(())
786 }
787
788 #[mz_ore::instrument(level = "debug")]
789 async fn sequence_create_unmanaged_cluster(
790 &mut self,
791 session: &Session,
792 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
793 id: ClusterId,
794 mut ops: Vec<catalog::Op>,
795 ) -> Result<ExecuteResponse, AdapterError> {
796 tracing::debug!("sequence_create_unmanaged_cluster");
797
798 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
799 if let mz_sql::plan::ReplicaConfig::Orchestrated {
800 availability_zone: Some(az),
801 ..
802 } = &r
803 {
804 Some(az)
805 } else {
806 None
807 }
808 }))?;
809
810 if id.is_user() {
815 self.validate_resource_limit(
816 0,
817 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
818 SystemVars::max_replicas_per_cluster,
819 "cluster replica",
820 MAX_REPLICAS_PER_CLUSTER.name(),
821 )?;
822 }
823
824 for (replica_name, replica_config) in replicas {
825 let (compute, location) = match replica_config {
828 mz_sql::plan::ReplicaConfig::Unorchestrated {
829 storagectl_addrs,
830 computectl_addrs,
831 compute,
832 } => {
833 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
834 storagectl_addrs,
835 computectl_addrs,
836 };
837 (compute, location)
838 }
839 mz_sql::plan::ReplicaConfig::Orchestrated {
840 availability_zone,
841 billed_as,
842 compute,
843 internal,
844 size,
845 } => {
846 if !session.user().is_internal() && (internal || billed_as.is_some()) {
848 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
849 }
850 if billed_as.is_some() && !internal {
852 coord_bail!("must specify INTERNAL when specifying BILLED AS");
853 }
854
855 let location = mz_catalog::durable::ReplicaLocation::Managed {
856 availability_zone,
857 billed_as,
858 internal,
859 size: size.clone(),
860 pending: false,
861 };
862 (compute, location)
863 }
864 };
865
866 let logging = if let Some(config) = compute.introspection {
867 ReplicaLogging {
868 log_logging: config.debugging,
869 interval: Some(config.interval),
870 }
871 } else {
872 ReplicaLogging::default()
873 };
874
875 let role_id = session.role_metadata().current_role;
876 let config = ReplicaConfig {
877 location: self.catalog().concretize_replica_location(
878 location,
879 &self
880 .catalog()
881 .get_role_allowed_cluster_sizes(&Some(role_id)),
882 None,
883 )?,
884 compute: ComputeReplicaConfig { logging },
885 };
886
887 ops.push(catalog::Op::CreateClusterReplica {
888 cluster_id: id,
889 name: replica_name.clone(),
890 config,
891 owner_id: *session.current_role_id(),
892 reason: ReplicaCreateDropReason::Manual,
893 });
894 }
895
896 self.catalog_transact(Some(session), ops).await?;
897
898 Ok(ExecuteResponse::CreatedCluster)
899 }
900
901 #[mz_ore::instrument(level = "debug")]
902 pub(crate) async fn sequence_create_cluster_replica(
903 &mut self,
904 session: &Session,
905 CreateClusterReplicaPlan {
906 name,
907 cluster_id,
908 config,
909 }: CreateClusterReplicaPlan,
910 ) -> Result<ExecuteResponse, AdapterError> {
911 let (compute, location) = match config {
913 mz_sql::plan::ReplicaConfig::Unorchestrated {
914 storagectl_addrs,
915 computectl_addrs,
916 compute,
917 } => {
918 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
919 storagectl_addrs,
920 computectl_addrs,
921 };
922 (compute, location)
923 }
924 mz_sql::plan::ReplicaConfig::Orchestrated {
925 availability_zone,
926 billed_as,
927 compute,
928 internal,
929 size,
930 } => {
931 let availability_zone = match availability_zone {
932 Some(az) => {
933 self.ensure_valid_azs([&az])?;
934 Some(az)
935 }
936 None => None,
937 };
938 let location = mz_catalog::durable::ReplicaLocation::Managed {
939 availability_zone,
940 billed_as,
941 internal,
942 size,
943 pending: false,
944 };
945 (compute, location)
946 }
947 };
948
949 let logging = if let Some(config) = compute.introspection {
950 ReplicaLogging {
951 log_logging: config.debugging,
952 interval: Some(config.interval),
953 }
954 } else {
955 ReplicaLogging::default()
956 };
957
958 let role_id = session.role_metadata().current_role;
959 let config = ReplicaConfig {
960 location: self.catalog().concretize_replica_location(
961 location,
962 &self
963 .catalog()
964 .get_role_allowed_cluster_sizes(&Some(role_id)),
965 None,
968 )?,
969 compute: ComputeReplicaConfig { logging },
970 };
971
972 let cluster = self.catalog().get_cluster(cluster_id);
973
974 if let ReplicaLocation::Managed(ManagedReplicaLocation {
975 internal,
976 billed_as,
977 ..
978 }) = &config.location
979 {
980 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
982 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
983 }
984 if cluster.is_managed() && !*internal {
986 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
987 }
988 if billed_as.is_some() && !*internal {
990 coord_bail!("must specify INTERNAL when specifying BILLED AS");
991 }
992 }
993
994 let owner_id = cluster.owner_id();
996 let op = catalog::Op::CreateClusterReplica {
997 cluster_id,
998 name: name.clone(),
999 config,
1000 owner_id,
1001 reason: ReplicaCreateDropReason::Manual,
1002 };
1003
1004 self.catalog_transact(Some(session), vec![op]).await?;
1005
1006 Ok(ExecuteResponse::CreatedClusterReplica)
1007 }
1008
1009 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1018 &mut self,
1019 session: Option<&Session>,
1020 cluster_id: ClusterId,
1021 new_config: ClusterConfig,
1022 reason: ReplicaCreateDropReason,
1023 strategy: AlterClusterPlanStrategy,
1024 ) -> Result<NeedsFinalization, AdapterError> {
1025 let cluster = self.catalog.get_cluster(cluster_id);
1026 let name = cluster.name().to_string();
1027 let owner_id = cluster.owner_id();
1028
1029 let mut ops = vec![];
1030 let mut finalization_needed = NeedsFinalization::No;
1031
1032 let ClusterVariant::Managed(ClusterVariantManaged {
1033 size,
1034 availability_zones,
1035 logging,
1036 replication_factor,
1037 optimizer_feature_overrides: _,
1038 schedule: _,
1039 }) = &cluster.config.variant
1040 else {
1041 panic!("expected existing managed cluster config");
1042 };
1043 let ClusterVariant::Managed(ClusterVariantManaged {
1044 size: new_size,
1045 replication_factor: new_replication_factor,
1046 availability_zones: new_availability_zones,
1047 logging: new_logging,
1048 optimizer_feature_overrides: _,
1049 schedule: _,
1050 }) = &new_config.variant
1051 else {
1052 panic!("expected new managed cluster config");
1053 };
1054
1055 let role_id = session.map(|s| s.role_metadata().current_role);
1056 self.catalog.ensure_valid_replica_size(
1057 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1058 new_size,
1059 )?;
1060
1061 if cluster.replicas().any(|r| r.config.location.pending()) {
1063 return Err(AlterClusterWhilePendingReplicas);
1064 }
1065
1066 let compute = mz_sql::plan::ComputeReplicaConfig {
1067 introspection: new_logging
1068 .interval
1069 .map(|interval| ComputeReplicaIntrospectionConfig {
1070 debugging: new_logging.log_logging,
1071 interval,
1072 }),
1073 };
1074
1075 if new_replication_factor > replication_factor {
1080 if cluster_id.is_user() {
1081 self.validate_resource_limit(
1082 usize::cast_from(*replication_factor),
1083 i64::from(*new_replication_factor) - i64::from(*replication_factor),
1084 SystemVars::max_replicas_per_cluster,
1085 "cluster replica",
1086 MAX_REPLICAS_PER_CLUSTER.name(),
1087 )?;
1088 }
1089 }
1090
1091 if new_size != size
1092 || new_availability_zones != availability_zones
1093 || new_logging != logging
1094 {
1095 self.ensure_valid_azs(new_availability_zones.iter())?;
1096 match strategy {
1100 AlterClusterPlanStrategy::None => {
1101 let replica_ids_and_reasons = (0..*replication_factor)
1102 .map(managed_cluster_replica_name)
1103 .filter_map(|name| cluster.replica_id(&name))
1104 .map(|replica_id| {
1105 catalog::DropObjectInfo::ClusterReplica((
1106 cluster.id(),
1107 replica_id,
1108 reason.clone(),
1109 ))
1110 })
1111 .collect();
1112 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1113 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1114 self.create_managed_cluster_replica_op(
1115 cluster_id,
1116 name.clone(),
1117 &compute,
1118 new_size,
1119 &mut ops,
1120 Some(new_availability_zones.as_ref()),
1121 false,
1122 owner_id,
1123 reason.clone(),
1124 )?;
1125 }
1126 }
1127 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1128 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1129 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1130 self.create_managed_cluster_replica_op(
1131 cluster_id,
1132 name.clone(),
1133 &compute,
1134 new_size,
1135 &mut ops,
1136 Some(new_availability_zones.as_ref()),
1137 true,
1138 owner_id,
1139 reason.clone(),
1140 )?;
1141 }
1142 finalization_needed = NeedsFinalization::Yes;
1143 }
1144 }
1145 } else if new_replication_factor < replication_factor {
1146 let replica_ids = (*new_replication_factor..*replication_factor)
1148 .map(managed_cluster_replica_name)
1149 .filter_map(|name| cluster.replica_id(&name))
1150 .map(|replica_id| {
1151 catalog::DropObjectInfo::ClusterReplica((
1152 cluster.id(),
1153 replica_id,
1154 reason.clone(),
1155 ))
1156 })
1157 .collect();
1158 ops.push(catalog::Op::DropObjects(replica_ids));
1159 } else if new_replication_factor > replication_factor {
1160 for name in
1162 (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1163 {
1164 self.create_managed_cluster_replica_op(
1165 cluster_id,
1166 name.clone(),
1167 &compute,
1168 new_size,
1169 &mut ops,
1170 Some(new_availability_zones.as_ref()),
1173 false,
1174 owner_id,
1175 reason.clone(),
1176 )?;
1177 }
1178 }
1179
1180 match finalization_needed {
1183 NeedsFinalization::No => {
1184 ops.push(catalog::Op::UpdateClusterConfig {
1185 id: cluster_id,
1186 name: name.clone(),
1187 config: new_config,
1188 });
1189 }
1190 NeedsFinalization::Yes => {}
1191 }
1192 self.catalog_transact(session, ops).await?;
1193 Ok(finalization_needed)
1194 }
1195
1196 async fn sequence_alter_cluster_unmanaged_to_managed(
1200 &mut self,
1201 session: &Session,
1202 cluster_id: ClusterId,
1203 mut new_config: ClusterConfig,
1204 options: PlanClusterOption,
1205 ) -> Result<(), AdapterError> {
1206 let cluster = self.catalog.get_cluster(cluster_id);
1207 let cluster_name = cluster.name().to_string();
1208
1209 let ClusterVariant::Managed(ClusterVariantManaged {
1210 size: new_size,
1211 replication_factor: new_replication_factor,
1212 availability_zones: new_availability_zones,
1213 logging: _,
1214 optimizer_feature_overrides: _,
1215 schedule: _,
1216 }) = &mut new_config.variant
1217 else {
1218 panic!("expected new managed cluster config");
1219 };
1220
1221 let user_replica_count = cluster
1223 .user_replicas()
1224 .count()
1225 .try_into()
1226 .expect("must_fit");
1227 match options.replication_factor {
1228 AlterOptionParameter::Set(_) => {
1229 if user_replica_count != *new_replication_factor {
1231 coord_bail!(
1232 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1233 );
1234 }
1235 }
1236 _ => {
1237 *new_replication_factor = user_replica_count;
1238 }
1239 }
1240
1241 let mut names = BTreeSet::new();
1242 let mut sizes = BTreeSet::new();
1243
1244 self.ensure_valid_azs(new_availability_zones.iter())?;
1245
1246 for replica in cluster.user_replicas() {
1248 names.insert(replica.name.clone());
1249 match &replica.config.location {
1250 ReplicaLocation::Unmanaged(_) => coord_bail!(
1251 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1252 ),
1253 ReplicaLocation::Managed(location) => {
1254 sizes.insert(location.size.clone());
1255
1256 if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1257 &location.availability_zones
1258 {
1259 if !new_availability_zones.contains(az) {
1260 coord_bail!(
1261 "unmanaged replica has availability zone {az} which is not \
1262 in managed {new_availability_zones:?}"
1263 )
1264 }
1265 }
1266 }
1267 }
1268 }
1269
1270 if sizes.is_empty() {
1271 assert!(
1272 cluster.user_replicas().next().is_none(),
1273 "Cluster should not have replicas"
1274 );
1275 match &options.size {
1277 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1278 coord_bail!("Missing SIZE for empty cluster")
1279 }
1280 AlterOptionParameter::Set(_) => {} }
1282 } else if sizes.len() == 1 {
1283 let size = sizes.into_iter().next().expect("must exist");
1284 match &options.size {
1285 AlterOptionParameter::Set(sz) if *sz != size => {
1286 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1287 }
1288 _ => *new_size = size,
1289 }
1290 } else {
1291 let formatted = sizes
1292 .iter()
1293 .map(String::as_str)
1294 .collect::<Vec<_>>()
1295 .join(", ");
1296 coord_bail!(
1297 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1298 );
1299 }
1300
1301 for i in 0..*new_replication_factor {
1302 let name = managed_cluster_replica_name(i);
1303 names.remove(&name);
1304 }
1305 if !names.is_empty() {
1306 let formatted = names
1307 .iter()
1308 .map(String::as_str)
1309 .collect::<Vec<_>>()
1310 .join(", ");
1311 coord_bail!(
1312 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1313 );
1314 }
1315
1316 let ops = vec![catalog::Op::UpdateClusterConfig {
1317 id: cluster_id,
1318 name: cluster_name,
1319 config: new_config,
1320 }];
1321
1322 self.catalog_transact(Some(session), ops).await?;
1323 Ok(())
1324 }
1325
1326 async fn sequence_alter_cluster_managed_to_unmanaged(
1327 &mut self,
1328 session: &Session,
1329 cluster_id: ClusterId,
1330 new_config: ClusterConfig,
1331 ) -> Result<(), AdapterError> {
1332 let cluster = self.catalog().get_cluster(cluster_id);
1333
1334 let ops = vec![catalog::Op::UpdateClusterConfig {
1335 id: cluster_id,
1336 name: cluster.name().to_string(),
1337 config: new_config,
1338 }];
1339
1340 self.catalog_transact(Some(session), ops).await?;
1341 Ok(())
1342 }
1343
1344 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1345 &mut self,
1346 session: &Session,
1347 cluster_id: ClusterId,
1348 new_config: ClusterConfig,
1349 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1350 ) -> Result<(), AdapterError> {
1351 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1352 coord_bail!("Cannot alter replicas in unmanaged cluster");
1353 }
1354
1355 let cluster = self.catalog().get_cluster(cluster_id);
1356
1357 let ops = vec![catalog::Op::UpdateClusterConfig {
1358 id: cluster_id,
1359 name: cluster.name().to_string(),
1360 config: new_config,
1361 }];
1362
1363 self.catalog_transact(Some(session), ops).await?;
1364 Ok(())
1365 }
1366
1367 pub(crate) async fn sequence_alter_cluster_rename(
1368 &mut self,
1369 ctx: &mut ExecuteContext,
1370 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1371 ) -> Result<ExecuteResponse, AdapterError> {
1372 let op = Op::RenameCluster {
1373 id,
1374 name,
1375 to_name,
1376 check_reserved_names: true,
1377 };
1378 match self
1379 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1380 .await
1381 {
1382 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1383 Err(err) => Err(err),
1384 }
1385 }
1386
1387 pub(crate) async fn sequence_alter_cluster_swap(
1388 &mut self,
1389 ctx: &mut ExecuteContext,
1390 AlterClusterSwapPlan {
1391 id_a,
1392 id_b,
1393 name_a,
1394 name_b,
1395 name_temp,
1396 }: AlterClusterSwapPlan,
1397 ) -> Result<ExecuteResponse, AdapterError> {
1398 let op_a = Op::RenameCluster {
1399 id: id_a,
1400 name: name_a.clone(),
1401 to_name: name_temp.clone(),
1402 check_reserved_names: false,
1403 };
1404 let op_b = Op::RenameCluster {
1405 id: id_b,
1406 name: name_b.clone(),
1407 to_name: name_a,
1408 check_reserved_names: false,
1409 };
1410 let op_temp = Op::RenameCluster {
1411 id: id_a,
1412 name: name_temp,
1413 to_name: name_b,
1414 check_reserved_names: false,
1415 };
1416
1417 match self
1418 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1419 Box::pin(async {})
1420 })
1421 .await
1422 {
1423 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1424 Err(err) => Err(err),
1425 }
1426 }
1427
1428 pub(crate) async fn sequence_alter_cluster_replica_rename(
1429 &mut self,
1430 session: &Session,
1431 AlterClusterReplicaRenamePlan {
1432 cluster_id,
1433 replica_id,
1434 name,
1435 to_name,
1436 }: AlterClusterReplicaRenamePlan,
1437 ) -> Result<ExecuteResponse, AdapterError> {
1438 let op = catalog::Op::RenameClusterReplica {
1439 cluster_id,
1440 replica_id,
1441 name,
1442 to_name,
1443 };
1444 match self.catalog_transact(Some(session), vec![op]).await {
1445 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1446 Err(err) => Err(err),
1447 }
1448 }
1449
1450 pub(crate) async fn sequence_alter_set_cluster(
1452 &self,
1453 _session: &Session,
1454 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1455 ) -> Result<ExecuteResponse, AdapterError> {
1456 async {}.await;
1460 let entry = self.catalog().get_entry(&id);
1461 match entry.item().typ() {
1462 _ => {
1463 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1465 }
1466 }
1467 }
1468}
1469
1470fn managed_cluster_replica_name(index: u32) -> String {
1471 format!("r{}", index + 1)
1472}
1473
1474#[derive(PartialEq)]
1477pub(crate) enum NeedsFinalization {
1478 Yes,
1480 No,
1481}