1use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23 ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::AdapterError::AlterClusterWhilePendingReplicas;
44use crate::catalog::{self, Op, ReplicaCreateDropReason};
45use crate::coord::{
46 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
47 Message, PlanValidity, StageResult, Staged,
48};
49use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
50
51const PENDING_REPLICA_SUFFIX: &str = "-pending";
52
53impl Staged for ClusterStage {
54 type Ctx = ExecuteContext;
55
56 fn validity(&mut self) -> &mut PlanValidity {
57 match self {
58 Self::Alter(stage) => &mut stage.validity,
59 Self::WaitForHydrated(stage) => &mut stage.validity,
60 Self::Finalize(stage) => &mut stage.validity,
61 }
62 }
63
64 async fn stage(
65 self,
66 coord: &mut Coordinator,
67 ctx: &mut ExecuteContext,
68 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
69 match self {
70 Self::Alter(stage) => {
71 coord
72 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
73 .await
74 }
75 Self::WaitForHydrated(stage) => {
76 let AlterClusterWaitForHydrated {
77 validity,
78 plan,
79 new_config,
80 timeout_time,
81 on_timeout,
82 } = stage;
83 coord
84 .check_if_pending_replicas_hydrated_stage(
85 ctx.session(),
86 plan,
87 new_config,
88 timeout_time,
89 on_timeout,
90 validity,
91 )
92 .await
93 }
94 Self::Finalize(stage) => {
95 coord
96 .finalize_alter_cluster_stage(
97 ctx.session(),
98 stage.plan.clone(),
99 stage.new_config.clone(),
100 )
101 .await
102 }
103 }
104 }
105
106 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
107 Message::ClusterStageReady {
108 ctx,
109 span,
110 stage: self,
111 }
112 }
113
114 fn cancel_enabled(&self) -> bool {
115 true
116 }
117}
118
119impl Coordinator {
120 #[instrument]
121 pub(crate) async fn sequence_alter_cluster_staged(
122 &mut self,
123 ctx: ExecuteContext,
124 plan: plan::AlterClusterPlan,
125 ) {
126 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
127 self.sequence_staged(ctx, Span::current(), stage).await;
128 }
129
130 #[instrument]
131 async fn alter_cluster_validate(
132 &mut self,
133 session: &Session,
134 plan: plan::AlterClusterPlan,
135 ) -> Result<ClusterStage, AdapterError> {
136 let validity = PlanValidity::new(
137 self.catalog().transient_revision(),
138 BTreeSet::new(),
139 Some(plan.id.clone()),
140 None,
141 session.role_metadata().clone(),
142 );
143 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
144 }
145
146 async fn sequence_alter_cluster_stage(
147 &mut self,
148 session: &Session,
149 plan: plan::AlterClusterPlan,
150 validity: PlanValidity,
151 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
152 let AlterClusterPlan {
153 id: cluster_id,
154 name: _,
155 ref options,
156 ref strategy,
157 } = plan;
158
159 use mz_catalog::memory::objects::ClusterVariant::*;
160 use mz_sql::plan::AlterOptionParameter::*;
161 let cluster = self.catalog.get_cluster(cluster_id);
162 let config = cluster.config.clone();
163 let mut new_config = config.clone();
164
165 match (&new_config.variant, &options.managed) {
166 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
167 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
168 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
169 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
170 let size = "".to_string();
174 let logging = ReplicaLogging {
175 log_logging: false,
176 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
177 };
178 new_config.variant = Managed(ClusterVariantManaged {
179 size,
180 availability_zones: Default::default(),
181 logging,
182 replication_factor: 1,
183 optimizer_feature_overrides: Default::default(),
184 schedule: Default::default(),
185 });
186 }
187 }
188
189 match &mut new_config.variant {
190 Managed(ClusterVariantManaged {
191 size,
192 availability_zones,
193 logging,
194 replication_factor,
195 optimizer_feature_overrides: _,
196 schedule,
197 }) => {
198 match &options.size {
199 Set(s) => size.clone_from(s),
200 Reset => coord_bail!("SIZE has no default value"),
201 Unchanged => {}
202 }
203 match &options.availability_zones {
204 Set(az) => availability_zones.clone_from(az),
205 Reset => *availability_zones = Default::default(),
206 Unchanged => {}
207 }
208 match &options.introspection_debugging {
209 Set(id) => logging.log_logging = *id,
210 Reset => logging.log_logging = false,
211 Unchanged => {}
212 }
213 match &options.introspection_interval {
214 Set(ii) => logging.interval = ii.0,
215 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
216 Unchanged => {}
217 }
218 match &options.replication_factor {
219 Set(rf) => *replication_factor = *rf,
220 Reset => {
221 *replication_factor = self
222 .catalog
223 .system_config()
224 .default_cluster_replication_factor()
225 }
226 Unchanged => {}
227 }
228 match &options.schedule {
229 Set(new_schedule) => {
230 *schedule = new_schedule.clone();
231 }
232 Reset => *schedule = Default::default(),
233 Unchanged => {}
234 }
235 if !matches!(options.replicas, Unchanged) {
236 coord_bail!("Cannot change REPLICAS of managed clusters");
237 }
238 }
239 Unmanaged => {
240 if !matches!(options.size, Unchanged) {
241 coord_bail!("Cannot change SIZE of unmanaged clusters");
242 }
243 if !matches!(options.availability_zones, Unchanged) {
244 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
245 }
246 if !matches!(options.introspection_debugging, Unchanged) {
247 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
248 }
249 if !matches!(options.introspection_interval, Unchanged) {
250 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
251 }
252 if !matches!(options.replication_factor, Unchanged) {
253 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
254 }
255 }
256 }
257
258 match &options.workload_class {
259 Set(wc) => new_config.workload_class.clone_from(wc),
260 Reset => new_config.workload_class = None,
261 Unchanged => {}
262 }
263
264 if new_config == config {
265 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
266 ObjectType::Cluster,
267 )));
268 }
269
270 let new_workload_class = new_config.workload_class.clone();
271 match (&config.variant, &new_config.variant) {
272 (Managed(_), Managed(new_config_managed)) => {
273 let alter_followup = self
274 .sequence_alter_cluster_managed_to_managed(
275 Some(session),
276 cluster_id,
277 new_config.clone(),
278 ReplicaCreateDropReason::Manual,
279 strategy.clone(),
280 )
281 .await?;
282 if alter_followup == NeedsFinalization::Yes {
283 self.active_conns
286 .get_mut(session.conn_id())
287 .expect("There must be an active connection")
288 .pending_cluster_alters
289 .insert(cluster_id.clone());
290 let new_config_managed = new_config_managed.clone();
291 return match &strategy {
292 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
293 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
294 .into(),
295 )),
296 AlterClusterPlanStrategy::For(duration) => {
297 let span = Span::current();
298 let plan = plan.clone();
299 let duration = duration.clone().to_owned();
300 Ok(StageResult::Handle(mz_ore::task::spawn(
301 || "Finalize Alter Cluster",
302 async move {
303 tokio::time::sleep(duration).await;
304 let stage = ClusterStage::Finalize(AlterClusterFinalize {
305 validity,
306 plan,
307 new_config: new_config_managed,
308 });
309 Ok(Box::new(stage))
310 }
311 .instrument(span),
312 )))
313 }
314 AlterClusterPlanStrategy::UntilReady {
315 timeout,
316 on_timeout,
317 } => Ok(StageResult::Immediate(Box::new(
318 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
319 validity,
320 plan: plan.clone(),
321 new_config: new_config_managed.clone(),
322 timeout_time: Instant::now() + timeout.to_owned(),
323 on_timeout: on_timeout.to_owned(),
324 }),
325 ))),
326 };
327 }
328 }
329 (Unmanaged, Managed(_)) => {
330 self.sequence_alter_cluster_unmanaged_to_managed(
331 session,
332 cluster_id,
333 new_config,
334 options.to_owned(),
335 )
336 .await?;
337 }
338 (Managed(_), Unmanaged) => {
339 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
340 .await?;
341 }
342 (Unmanaged, Unmanaged) => {
343 self.sequence_alter_cluster_unmanaged_to_unmanaged(
344 session,
345 cluster_id,
346 new_config,
347 options.replicas.clone(),
348 )
349 .await?;
350 }
351 }
352
353 self.controller
354 .update_cluster_workload_class(cluster_id, new_workload_class)?;
355
356 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
357 ObjectType::Cluster,
358 )))
359 }
360
361 async fn finalize_alter_cluster_stage(
362 &mut self,
363 session: &Session,
364 AlterClusterPlan {
365 id: cluster_id,
366 name: cluster_name,
367 ..
368 }: AlterClusterPlan,
369 new_config: ClusterVariantManaged,
370 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
371 let cluster = self.catalog.get_cluster(cluster_id);
372 let workload_class = cluster.config.workload_class.clone();
373 let mut ops = vec![];
374
375 let remove_replicas = cluster
378 .replicas()
379 .filter_map(|r| {
380 if !r.config.location.pending() && !r.config.location.internal() {
381 Some(catalog::DropObjectInfo::ClusterReplica((
382 cluster_id.clone(),
383 r.replica_id,
384 ReplicaCreateDropReason::Manual,
385 )))
386 } else {
387 None
388 }
389 })
390 .collect();
391 ops.push(catalog::Op::DropObjects(remove_replicas));
392
393 let finalize_replicas: Vec<catalog::Op> = cluster
396 .replicas()
397 .filter_map(|r| {
398 if r.config.location.pending() {
399 let cluster_ident = match Ident::new(cluster.name.clone()) {
400 Ok(id) => id,
401 Err(err) => {
402 return Some(Err(AdapterError::internal(
403 "Unexpected error parsing cluster name",
404 err,
405 )));
406 }
407 };
408 let replica_ident = match Ident::new(r.name.clone()) {
409 Ok(id) => id,
410 Err(err) => {
411 return Some(Err(AdapterError::internal(
412 "Unexpected error parsing replica name",
413 err,
414 )));
415 }
416 };
417 Some(Ok((cluster_ident, replica_ident, r)))
418 } else {
419 None
420 }
421 })
422 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
425 .into_iter()
426 .map(|(cluster_ident, replica_ident, replica)| {
427 let mut new_replica_config = replica.config.clone();
428 debug!("Promoting replica: {}", replica.name);
429 match new_replica_config.location {
430 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
431 ref mut pending,
432 ..
433 }) => {
434 *pending = false;
435 }
436 _ => {}
437 }
438
439 let mut replica_ops = vec![];
440 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
441 if let Some(to_name) = to_name {
442 replica_ops.push(catalog::Op::RenameClusterReplica {
443 cluster_id: cluster_id.clone(),
444 replica_id: replica.replica_id.to_owned(),
445 name: QualifiedReplica {
446 cluster: cluster_ident,
447 replica: replica_ident,
448 },
449 to_name: to_name.to_owned(),
450 });
451 }
452 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
453 cluster_id,
454 replica_id: replica.replica_id.to_owned(),
455 config: new_replica_config,
456 });
457 replica_ops
458 })
459 .flatten()
460 .collect();
461
462 ops.extend(finalize_replicas);
463
464 ops.push(Op::UpdateClusterConfig {
466 id: cluster_id,
467 name: cluster_name,
468 config: ClusterConfig {
469 variant: ClusterVariant::Managed(new_config),
470 workload_class: workload_class.clone(),
471 },
472 });
473 self.catalog_transact(Some(session), ops).await?;
474 self.active_conns
477 .get_mut(session.conn_id())
478 .expect("There must be an active connection")
479 .pending_cluster_alters
480 .remove(&cluster_id);
481
482 self.controller
483 .update_cluster_workload_class(cluster_id, workload_class)?;
484
485 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
486 ObjectType::Cluster,
487 )))
488 }
489
490 async fn check_if_pending_replicas_hydrated_stage(
491 &mut self,
492 session: &Session,
493 plan: AlterClusterPlan,
494 new_config: ClusterVariantManaged,
495 timeout_time: Instant,
496 on_timeout: OnTimeoutAction,
497 validity: PlanValidity,
498 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
499 let cluster = self.catalog.get_cluster(plan.id);
501 let pending_replicas = cluster
502 .replicas()
503 .filter_map(|r| {
504 if r.config.location.pending() {
505 Some(r.replica_id.clone())
506 } else {
507 None
508 }
509 })
510 .collect_vec();
511 if Instant::now() > timeout_time {
513 match on_timeout {
515 OnTimeoutAction::Rollback => {
516 self.active_conns
517 .get_mut(session.conn_id())
518 .expect("There must be an active connection")
519 .pending_cluster_alters
520 .remove(&cluster.id);
521 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
522 .await?;
523 return Err(AdapterError::AlterClusterTimeout);
524 }
525 OnTimeoutAction::Commit => {
526 let span = Span::current();
527 let poll_duration = self
528 .catalog
529 .system_config()
530 .cluster_alter_check_ready_interval()
531 .clone();
532 return Ok(StageResult::Handle(mz_ore::task::spawn(
533 || "Finalize Alter Cluster",
534 async move {
535 tokio::time::sleep(poll_duration).await;
536 let stage = ClusterStage::Finalize(AlterClusterFinalize {
537 validity,
538 plan,
539 new_config,
540 });
541 Ok(Box::new(stage))
542 }
543 .instrument(span),
544 )));
545 }
546 }
547 }
548 let compute_hydrated_fut = self
549 .controller
550 .compute
551 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
552 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
553
554 let storage_hydrated = self
555 .controller
556 .storage
557 .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
558 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560 let span = Span::current();
561 Ok(StageResult::Handle(mz_ore::task::spawn(
562 || "Alter Cluster: wait for hydrated",
563 async move {
564 let compute_hydrated = compute_hydrated_fut
565 .await
566 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
567
568 if compute_hydrated && storage_hydrated {
569 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
571 validity,
572 plan,
573 new_config,
574 })))
575 } else {
576 tokio::time::sleep(Duration::from_secs(1)).await;
578 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
579 validity,
580 plan,
581 new_config,
582 timeout_time,
583 on_timeout,
584 });
585 Ok(Box::new(stage))
586 }
587 }
588 .instrument(span),
589 )))
590 }
591
592 #[mz_ore::instrument(level = "debug")]
593 pub(crate) async fn sequence_create_cluster(
594 &mut self,
595 session: &Session,
596 CreateClusterPlan {
597 name,
598 variant,
599 workload_class,
600 }: CreateClusterPlan,
601 ) -> Result<ExecuteResponse, AdapterError> {
602 tracing::debug!("sequence_create_cluster");
603
604 let id_ts = self.get_catalog_write_ts().await;
605 let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
606 let introspection_sources = BUILTINS::logs().collect();
611 let cluster_variant = match &variant {
612 CreateClusterVariant::Managed(plan) => {
613 let logging = if let Some(config) = plan.compute.introspection {
614 ReplicaLogging {
615 log_logging: config.debugging,
616 interval: Some(config.interval),
617 }
618 } else {
619 ReplicaLogging::default()
620 };
621 ClusterVariant::Managed(ClusterVariantManaged {
622 size: plan.size.clone(),
623 availability_zones: plan.availability_zones.clone(),
624 logging,
625 replication_factor: plan.replication_factor,
626 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
627 schedule: plan.schedule.clone(),
628 })
629 }
630 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
631 };
632 let config = ClusterConfig {
633 variant: cluster_variant,
634 workload_class,
635 };
636 let ops = vec![catalog::Op::CreateCluster {
637 id,
638 name: name.clone(),
639 introspection_sources,
640 owner_id: *session.current_role_id(),
641 config,
642 }];
643
644 match variant {
645 CreateClusterVariant::Managed(plan) => {
646 self.sequence_create_managed_cluster(session, plan, id, ops)
647 .await
648 }
649 CreateClusterVariant::Unmanaged(plan) => {
650 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
651 .await
652 }
653 }
654 }
655
656 #[mz_ore::instrument(level = "debug")]
657 async fn sequence_create_managed_cluster(
658 &mut self,
659 session: &Session,
660 CreateClusterManagedPlan {
661 availability_zones,
662 compute,
663 replication_factor,
664 size,
665 optimizer_feature_overrides: _,
666 schedule: _,
667 }: CreateClusterManagedPlan,
668 cluster_id: ClusterId,
669 mut ops: Vec<catalog::Op>,
670 ) -> Result<ExecuteResponse, AdapterError> {
671 tracing::debug!("sequence_create_managed_cluster");
672
673 self.ensure_valid_azs(availability_zones.iter())?;
674
675 let role_id = session.role_metadata().current_role;
676 self.catalog.ensure_valid_replica_size(
677 &self
678 .catalog()
679 .get_role_allowed_cluster_sizes(&Some(role_id)),
680 &size,
681 )?;
682
683 if cluster_id.is_user() {
688 self.validate_resource_limit(
689 0,
690 i64::from(replication_factor),
691 SystemVars::max_replicas_per_cluster,
692 "cluster replica",
693 MAX_REPLICAS_PER_CLUSTER.name(),
694 )?;
695 }
696
697 for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
698 self.create_managed_cluster_replica_op(
699 cluster_id,
700 replica_name,
701 &compute,
702 &size,
703 &mut ops,
704 if availability_zones.is_empty() {
705 None
706 } else {
707 Some(availability_zones.as_ref())
708 },
709 false,
710 *session.current_role_id(),
711 ReplicaCreateDropReason::Manual,
712 )?;
713 }
714
715 self.catalog_transact(Some(session), ops).await?;
716
717 self.create_cluster(cluster_id).await;
718
719 Ok(ExecuteResponse::CreatedCluster)
720 }
721
722 fn create_managed_cluster_replica_op(
723 &self,
724 cluster_id: ClusterId,
725 name: String,
726 compute: &mz_sql::plan::ComputeReplicaConfig,
727 size: &String,
728 ops: &mut Vec<Op>,
729 azs: Option<&[String]>,
730 pending: bool,
731 owner_id: RoleId,
732 reason: ReplicaCreateDropReason,
733 ) -> Result<(), AdapterError> {
734 let location = mz_catalog::durable::ReplicaLocation::Managed {
735 availability_zone: None,
736 billed_as: None,
737 internal: false,
738 size: size.clone(),
739 pending,
740 };
741
742 let logging = if let Some(config) = compute.introspection {
743 ReplicaLogging {
744 log_logging: config.debugging,
745 interval: Some(config.interval),
746 }
747 } else {
748 ReplicaLogging::default()
749 };
750
751 let config = ReplicaConfig {
752 location: self.catalog().concretize_replica_location(
753 location,
754 &self
755 .catalog()
756 .get_role_allowed_cluster_sizes(&Some(owner_id)),
757 azs,
758 )?,
759 compute: ComputeReplicaConfig { logging },
760 };
761
762 ops.push(catalog::Op::CreateClusterReplica {
763 cluster_id,
764 name,
765 config,
766 owner_id,
767 reason,
768 });
769 Ok(())
770 }
771
772 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
773 &self,
774 azs: I,
775 ) -> Result<(), AdapterError> {
776 let cat_azs = self.catalog().state().availability_zones();
777 for az in azs.into_iter() {
778 if !cat_azs.contains(az) {
779 return Err(AdapterError::InvalidClusterReplicaAz {
780 az: az.to_string(),
781 expected: cat_azs.to_vec(),
782 });
783 }
784 }
785 Ok(())
786 }
787
788 #[mz_ore::instrument(level = "debug")]
789 async fn sequence_create_unmanaged_cluster(
790 &mut self,
791 session: &Session,
792 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
793 id: ClusterId,
794 mut ops: Vec<catalog::Op>,
795 ) -> Result<ExecuteResponse, AdapterError> {
796 tracing::debug!("sequence_create_unmanaged_cluster");
797
798 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
799 if let mz_sql::plan::ReplicaConfig::Orchestrated {
800 availability_zone: Some(az),
801 ..
802 } = &r
803 {
804 Some(az)
805 } else {
806 None
807 }
808 }))?;
809
810 if id.is_user() {
815 self.validate_resource_limit(
816 0,
817 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
818 SystemVars::max_replicas_per_cluster,
819 "cluster replica",
820 MAX_REPLICAS_PER_CLUSTER.name(),
821 )?;
822 }
823
824 for (replica_name, replica_config) in replicas {
825 let (compute, location) = match replica_config {
828 mz_sql::plan::ReplicaConfig::Unorchestrated {
829 storagectl_addrs,
830 computectl_addrs,
831 compute,
832 } => {
833 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
834 storagectl_addrs,
835 computectl_addrs,
836 };
837 (compute, location)
838 }
839 mz_sql::plan::ReplicaConfig::Orchestrated {
840 availability_zone,
841 billed_as,
842 compute,
843 internal,
844 size,
845 } => {
846 if !session.user().is_internal() && (internal || billed_as.is_some()) {
848 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
849 }
850 if billed_as.is_some() && !internal {
852 coord_bail!("must specify INTERNAL when specifying BILLED AS");
853 }
854
855 let location = mz_catalog::durable::ReplicaLocation::Managed {
856 availability_zone,
857 billed_as,
858 internal,
859 size: size.clone(),
860 pending: false,
861 };
862 (compute, location)
863 }
864 };
865
866 let logging = if let Some(config) = compute.introspection {
867 ReplicaLogging {
868 log_logging: config.debugging,
869 interval: Some(config.interval),
870 }
871 } else {
872 ReplicaLogging::default()
873 };
874
875 let role_id = session.role_metadata().current_role;
876 let config = ReplicaConfig {
877 location: self.catalog().concretize_replica_location(
878 location,
879 &self
880 .catalog()
881 .get_role_allowed_cluster_sizes(&Some(role_id)),
882 None,
883 )?,
884 compute: ComputeReplicaConfig { logging },
885 };
886
887 ops.push(catalog::Op::CreateClusterReplica {
888 cluster_id: id,
889 name: replica_name.clone(),
890 config,
891 owner_id: *session.current_role_id(),
892 reason: ReplicaCreateDropReason::Manual,
893 });
894 }
895
896 self.catalog_transact(Some(session), ops).await?;
897
898 self.create_cluster(id).await;
899
900 Ok(ExecuteResponse::CreatedCluster)
901 }
902
903 async fn create_cluster(&mut self, cluster_id: ClusterId) {
904 let Coordinator {
905 catalog,
906 controller,
907 ..
908 } = self;
909 let cluster = catalog.get_cluster(cluster_id);
910 let cluster_id = cluster.id;
911 let introspection_source_ids: Vec<_> =
912 cluster.log_indexes.iter().map(|(_, id)| *id).collect();
913
914 controller
915 .create_cluster(
916 cluster_id,
917 mz_controller::clusters::ClusterConfig {
918 arranged_logs: cluster.log_indexes.clone(),
919 workload_class: cluster.config.workload_class.clone(),
920 },
921 )
922 .expect("creating cluster must not fail");
923
924 let replica_ids: Vec<_> = cluster
925 .replicas()
926 .map(|r| (r.replica_id, format!("{}.{}", cluster.name(), &r.name)))
927 .collect();
928 for (replica_id, replica_name) in replica_ids {
929 self.create_cluster_replica(cluster_id, replica_id, replica_name)
930 .await;
931 }
932
933 if !introspection_source_ids.is_empty() {
934 self.initialize_compute_read_policies(
935 introspection_source_ids,
936 cluster_id,
937 CompactionWindow::Default,
938 )
939 .await;
940 }
941 }
942
943 #[mz_ore::instrument(level = "debug")]
944 pub(crate) async fn sequence_create_cluster_replica(
945 &mut self,
946 session: &Session,
947 CreateClusterReplicaPlan {
948 name,
949 cluster_id,
950 config,
951 }: CreateClusterReplicaPlan,
952 ) -> Result<ExecuteResponse, AdapterError> {
953 let (compute, location) = match config {
955 mz_sql::plan::ReplicaConfig::Unorchestrated {
956 storagectl_addrs,
957 computectl_addrs,
958 compute,
959 } => {
960 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
961 storagectl_addrs,
962 computectl_addrs,
963 };
964 (compute, location)
965 }
966 mz_sql::plan::ReplicaConfig::Orchestrated {
967 availability_zone,
968 billed_as,
969 compute,
970 internal,
971 size,
972 } => {
973 let availability_zone = match availability_zone {
974 Some(az) => {
975 self.ensure_valid_azs([&az])?;
976 Some(az)
977 }
978 None => None,
979 };
980 let location = mz_catalog::durable::ReplicaLocation::Managed {
981 availability_zone,
982 billed_as,
983 internal,
984 size,
985 pending: false,
986 };
987 (compute, location)
988 }
989 };
990
991 let logging = if let Some(config) = compute.introspection {
992 ReplicaLogging {
993 log_logging: config.debugging,
994 interval: Some(config.interval),
995 }
996 } else {
997 ReplicaLogging::default()
998 };
999
1000 let role_id = session.role_metadata().current_role;
1001 let config = ReplicaConfig {
1002 location: self.catalog().concretize_replica_location(
1003 location,
1004 &self
1005 .catalog()
1006 .get_role_allowed_cluster_sizes(&Some(role_id)),
1007 None,
1010 )?,
1011 compute: ComputeReplicaConfig { logging },
1012 };
1013
1014 let cluster = self.catalog().get_cluster(cluster_id);
1015
1016 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1017 internal,
1018 billed_as,
1019 ..
1020 }) = &config.location
1021 {
1022 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1024 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1025 }
1026 if cluster.is_managed() && !*internal {
1028 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1029 }
1030 if billed_as.is_some() && !*internal {
1032 coord_bail!("must specify INTERNAL when specifying BILLED AS");
1033 }
1034 }
1035
1036 let owner_id = cluster.owner_id();
1038 let op = catalog::Op::CreateClusterReplica {
1039 cluster_id,
1040 name: name.clone(),
1041 config,
1042 owner_id,
1043 reason: ReplicaCreateDropReason::Manual,
1044 };
1045
1046 self.catalog_transact(Some(session), vec![op]).await?;
1047
1048 let id = self
1049 .catalog()
1050 .resolve_replica_in_cluster(&cluster_id, &name)
1051 .expect("just created")
1052 .replica_id();
1053
1054 self.create_cluster_replica(cluster_id, id, name).await;
1055
1056 Ok(ExecuteResponse::CreatedClusterReplica)
1057 }
1058
1059 async fn create_cluster_replica(
1060 &mut self,
1061 cluster_id: ClusterId,
1062 replica_id: ReplicaId,
1063 replica_name: String,
1064 ) {
1065 let cluster = self.catalog().get_cluster(cluster_id);
1066 let role = cluster.role();
1067 let replica_config = cluster
1068 .replica(replica_id)
1069 .expect("known to exist")
1070 .config
1071 .clone();
1072
1073 let enable_worker_core_affinity =
1074 self.catalog().system_config().enable_worker_core_affinity();
1075
1076 self.controller
1077 .create_replica(
1078 cluster_id,
1079 replica_id,
1080 cluster.name.to_owned(),
1081 replica_name,
1082 role,
1083 replica_config,
1084 enable_worker_core_affinity,
1085 )
1086 .expect("creating replicas must not fail");
1087
1088 self.install_introspection_subscribes(cluster_id, replica_id)
1089 .await;
1090 }
1091
1092 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1101 &mut self,
1102 session: Option<&Session>,
1103 cluster_id: ClusterId,
1104 new_config: ClusterConfig,
1105 reason: ReplicaCreateDropReason,
1106 strategy: AlterClusterPlanStrategy,
1107 ) -> Result<NeedsFinalization, AdapterError> {
1108 let cluster = self.catalog.get_cluster(cluster_id);
1109 let name = cluster.name().to_string();
1110 let owner_id = cluster.owner_id();
1111
1112 let mut ops = vec![];
1113 let mut create_cluster_replicas = vec![];
1114 let mut finalization_needed = NeedsFinalization::No;
1115
1116 let ClusterVariant::Managed(ClusterVariantManaged {
1117 size,
1118 availability_zones,
1119 logging,
1120 replication_factor,
1121 optimizer_feature_overrides: _,
1122 schedule: _,
1123 }) = &cluster.config.variant
1124 else {
1125 panic!("expected existing managed cluster config");
1126 };
1127 let ClusterVariant::Managed(ClusterVariantManaged {
1128 size: new_size,
1129 replication_factor: new_replication_factor,
1130 availability_zones: new_availability_zones,
1131 logging: new_logging,
1132 optimizer_feature_overrides: _,
1133 schedule: _,
1134 }) = &new_config.variant
1135 else {
1136 panic!("expected new managed cluster config");
1137 };
1138
1139 let role_id = session.map(|s| s.role_metadata().current_role);
1140 self.catalog.ensure_valid_replica_size(
1141 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1142 new_size,
1143 )?;
1144
1145 if cluster.replicas().any(|r| r.config.location.pending()) {
1147 return Err(AlterClusterWhilePendingReplicas);
1148 }
1149
1150 let compute = mz_sql::plan::ComputeReplicaConfig {
1151 introspection: new_logging
1152 .interval
1153 .map(|interval| ComputeReplicaIntrospectionConfig {
1154 debugging: new_logging.log_logging,
1155 interval,
1156 }),
1157 };
1158
1159 if new_replication_factor > replication_factor {
1164 if cluster_id.is_user() {
1165 self.validate_resource_limit(
1166 usize::cast_from(*replication_factor),
1167 i64::from(*new_replication_factor) - i64::from(*replication_factor),
1168 SystemVars::max_replicas_per_cluster,
1169 "cluster replica",
1170 MAX_REPLICAS_PER_CLUSTER.name(),
1171 )?;
1172 }
1173 }
1174
1175 if new_size != size
1176 || new_availability_zones != availability_zones
1177 || new_logging != logging
1178 {
1179 self.ensure_valid_azs(new_availability_zones.iter())?;
1180 match strategy {
1184 AlterClusterPlanStrategy::None => {
1185 let replica_ids_and_reasons = (0..*replication_factor)
1186 .map(managed_cluster_replica_name)
1187 .filter_map(|name| cluster.replica_id(&name))
1188 .map(|replica_id| {
1189 catalog::DropObjectInfo::ClusterReplica((
1190 cluster.id(),
1191 replica_id,
1192 reason.clone(),
1193 ))
1194 })
1195 .collect();
1196 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1197 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1198 self.create_managed_cluster_replica_op(
1199 cluster_id,
1200 name.clone(),
1201 &compute,
1202 new_size,
1203 &mut ops,
1204 Some(new_availability_zones.as_ref()),
1205 false,
1206 owner_id,
1207 reason.clone(),
1208 )?;
1209 create_cluster_replicas.push((cluster_id, name));
1210 }
1211 }
1212 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1213 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1214 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1215 self.create_managed_cluster_replica_op(
1216 cluster_id,
1217 name.clone(),
1218 &compute,
1219 new_size,
1220 &mut ops,
1221 Some(new_availability_zones.as_ref()),
1222 true,
1223 owner_id,
1224 reason.clone(),
1225 )?;
1226 create_cluster_replicas.push((cluster_id, name));
1227 }
1228 finalization_needed = NeedsFinalization::Yes;
1229 }
1230 }
1231 } else if new_replication_factor < replication_factor {
1232 let replica_ids = (*new_replication_factor..*replication_factor)
1234 .map(managed_cluster_replica_name)
1235 .filter_map(|name| cluster.replica_id(&name))
1236 .map(|replica_id| {
1237 catalog::DropObjectInfo::ClusterReplica((
1238 cluster.id(),
1239 replica_id,
1240 reason.clone(),
1241 ))
1242 })
1243 .collect();
1244 ops.push(catalog::Op::DropObjects(replica_ids));
1245 } else if new_replication_factor > replication_factor {
1246 for name in
1248 (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1249 {
1250 self.create_managed_cluster_replica_op(
1251 cluster_id,
1252 name.clone(),
1253 &compute,
1254 new_size,
1255 &mut ops,
1256 Some(new_availability_zones.as_ref()),
1259 false,
1260 owner_id,
1261 reason.clone(),
1262 )?;
1263 create_cluster_replicas.push((cluster_id, name))
1264 }
1265 }
1266
1267 match finalization_needed {
1270 NeedsFinalization::No => {
1271 ops.push(catalog::Op::UpdateClusterConfig {
1272 id: cluster_id,
1273 name: name.clone(),
1274 config: new_config,
1275 });
1276 }
1277 _ => {}
1278 }
1279 self.catalog_transact(session, ops.clone()).await?;
1280 for (cluster_id, replica_name) in create_cluster_replicas {
1281 let replica_id = self
1282 .catalog()
1283 .resolve_replica_in_cluster(&cluster_id, &replica_name)
1284 .expect("just created")
1285 .replica_id();
1286 self.create_cluster_replica(cluster_id, replica_id, replica_name)
1287 .await;
1288 }
1289 Ok(finalization_needed)
1290 }
1291
1292 async fn sequence_alter_cluster_unmanaged_to_managed(
1296 &mut self,
1297 session: &Session,
1298 cluster_id: ClusterId,
1299 mut new_config: ClusterConfig,
1300 options: PlanClusterOption,
1301 ) -> Result<(), AdapterError> {
1302 let cluster = self.catalog.get_cluster(cluster_id);
1303 let cluster_name = cluster.name().to_string();
1304
1305 let ClusterVariant::Managed(ClusterVariantManaged {
1306 size: new_size,
1307 replication_factor: new_replication_factor,
1308 availability_zones: new_availability_zones,
1309 logging: _,
1310 optimizer_feature_overrides: _,
1311 schedule: _,
1312 }) = &mut new_config.variant
1313 else {
1314 panic!("expected new managed cluster config");
1315 };
1316
1317 let user_replica_count = cluster
1319 .user_replicas()
1320 .count()
1321 .try_into()
1322 .expect("must_fit");
1323 match options.replication_factor {
1324 AlterOptionParameter::Set(_) => {
1325 if user_replica_count != *new_replication_factor {
1327 coord_bail!(
1328 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1329 );
1330 }
1331 }
1332 _ => {
1333 *new_replication_factor = user_replica_count;
1334 }
1335 }
1336
1337 let mut names = BTreeSet::new();
1338 let mut sizes = BTreeSet::new();
1339
1340 self.ensure_valid_azs(new_availability_zones.iter())?;
1341
1342 for replica in cluster.user_replicas() {
1344 names.insert(replica.name.clone());
1345 match &replica.config.location {
1346 ReplicaLocation::Unmanaged(_) => coord_bail!(
1347 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1348 ),
1349 ReplicaLocation::Managed(location) => {
1350 sizes.insert(location.size.clone());
1351
1352 if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1353 &location.availability_zones
1354 {
1355 if !new_availability_zones.contains(az) {
1356 coord_bail!(
1357 "unmanaged replica has availability zone {az} which is not \
1358 in managed {new_availability_zones:?}"
1359 )
1360 }
1361 }
1362 }
1363 }
1364 }
1365
1366 if sizes.is_empty() {
1367 assert!(
1368 cluster.user_replicas().next().is_none(),
1369 "Cluster should not have replicas"
1370 );
1371 match &options.size {
1373 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1374 coord_bail!("Missing SIZE for empty cluster")
1375 }
1376 _ => {} }
1378 } else if sizes.len() == 1 {
1379 let size = sizes.into_iter().next().expect("must exist");
1380 match &options.size {
1381 AlterOptionParameter::Set(sz) if *sz != size => {
1382 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1383 }
1384 _ => *new_size = size,
1385 }
1386 } else {
1387 let formatted = sizes
1388 .iter()
1389 .map(String::as_str)
1390 .collect::<Vec<_>>()
1391 .join(", ");
1392 coord_bail!(
1393 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1394 );
1395 }
1396
1397 for i in 0..*new_replication_factor {
1398 let name = managed_cluster_replica_name(i);
1399 names.remove(&name);
1400 }
1401 if !names.is_empty() {
1402 let formatted = names
1403 .iter()
1404 .map(String::as_str)
1405 .collect::<Vec<_>>()
1406 .join(", ");
1407 coord_bail!(
1408 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1409 );
1410 }
1411
1412 let ops = vec![catalog::Op::UpdateClusterConfig {
1413 id: cluster_id,
1414 name: cluster_name,
1415 config: new_config,
1416 }];
1417
1418 self.catalog_transact(Some(session), ops).await?;
1419 Ok(())
1420 }
1421
1422 async fn sequence_alter_cluster_managed_to_unmanaged(
1423 &mut self,
1424 session: &Session,
1425 cluster_id: ClusterId,
1426 new_config: ClusterConfig,
1427 ) -> Result<(), AdapterError> {
1428 let cluster = self.catalog().get_cluster(cluster_id);
1429
1430 let ops = vec![catalog::Op::UpdateClusterConfig {
1431 id: cluster_id,
1432 name: cluster.name().to_string(),
1433 config: new_config,
1434 }];
1435
1436 self.catalog_transact(Some(session), ops).await?;
1437 Ok(())
1438 }
1439
1440 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1441 &mut self,
1442 session: &Session,
1443 cluster_id: ClusterId,
1444 new_config: ClusterConfig,
1445 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1446 ) -> Result<(), AdapterError> {
1447 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1448 coord_bail!("Cannot alter replicas in unmanaged cluster");
1449 }
1450
1451 let cluster = self.catalog().get_cluster(cluster_id);
1452
1453 let ops = vec![catalog::Op::UpdateClusterConfig {
1454 id: cluster_id,
1455 name: cluster.name().to_string(),
1456 config: new_config,
1457 }];
1458
1459 self.catalog_transact(Some(session), ops).await?;
1460 Ok(())
1461 }
1462
1463 pub(crate) async fn sequence_alter_cluster_rename(
1464 &mut self,
1465 ctx: &mut ExecuteContext,
1466 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1467 ) -> Result<ExecuteResponse, AdapterError> {
1468 let op = Op::RenameCluster {
1469 id,
1470 name,
1471 to_name,
1472 check_reserved_names: true,
1473 };
1474 match self
1475 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1476 .await
1477 {
1478 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1479 Err(err) => Err(err),
1480 }
1481 }
1482
1483 pub(crate) async fn sequence_alter_cluster_swap(
1484 &mut self,
1485 ctx: &mut ExecuteContext,
1486 AlterClusterSwapPlan {
1487 id_a,
1488 id_b,
1489 name_a,
1490 name_b,
1491 name_temp,
1492 }: AlterClusterSwapPlan,
1493 ) -> Result<ExecuteResponse, AdapterError> {
1494 let op_a = Op::RenameCluster {
1495 id: id_a,
1496 name: name_a.clone(),
1497 to_name: name_temp.clone(),
1498 check_reserved_names: false,
1499 };
1500 let op_b = Op::RenameCluster {
1501 id: id_b,
1502 name: name_b.clone(),
1503 to_name: name_a,
1504 check_reserved_names: false,
1505 };
1506 let op_temp = Op::RenameCluster {
1507 id: id_a,
1508 name: name_temp,
1509 to_name: name_b,
1510 check_reserved_names: false,
1511 };
1512
1513 match self
1514 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1515 Box::pin(async {})
1516 })
1517 .await
1518 {
1519 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1520 Err(err) => Err(err),
1521 }
1522 }
1523
1524 pub(crate) async fn sequence_alter_cluster_replica_rename(
1525 &mut self,
1526 session: &Session,
1527 AlterClusterReplicaRenamePlan {
1528 cluster_id,
1529 replica_id,
1530 name,
1531 to_name,
1532 }: AlterClusterReplicaRenamePlan,
1533 ) -> Result<ExecuteResponse, AdapterError> {
1534 let op = catalog::Op::RenameClusterReplica {
1535 cluster_id,
1536 replica_id,
1537 name,
1538 to_name,
1539 };
1540 match self.catalog_transact(Some(session), vec![op]).await {
1541 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1542 Err(err) => Err(err),
1543 }
1544 }
1545
1546 pub(crate) async fn sequence_alter_set_cluster(
1548 &self,
1549 _session: &Session,
1550 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1551 ) -> Result<ExecuteResponse, AdapterError> {
1552 async {}.await;
1556 let entry = self.catalog().get_entry(&id);
1557 match entry.item().typ() {
1558 _ => {
1559 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1561 }
1562 }
1563 }
1564}
1565
1566fn managed_cluster_replica_name(index: u32) -> String {
1567 format!("r{}", index + 1)
1568}
1569
1570#[derive(PartialEq)]
1573pub(crate) enum NeedsFinalization {
1574 Yes,
1576 No,
1577}