1use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23 ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46 Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53 type Ctx = ExecuteContext;
54
55 fn validity(&mut self) -> &mut PlanValidity {
56 match self {
57 Self::Alter(stage) => &mut stage.validity,
58 Self::WaitForHydrated(stage) => &mut stage.validity,
59 Self::Finalize(stage) => &mut stage.validity,
60 }
61 }
62
63 async fn stage(
64 self,
65 coord: &mut Coordinator,
66 ctx: &mut ExecuteContext,
67 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68 match self {
69 Self::Alter(stage) => {
70 coord
71 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72 .await
73 }
74 Self::WaitForHydrated(stage) => {
75 let AlterClusterWaitForHydrated {
76 validity,
77 plan,
78 new_config,
79 timeout_time,
80 on_timeout,
81 } = stage;
82 coord
83 .check_if_pending_replicas_hydrated_stage(
84 ctx.session(),
85 plan,
86 new_config,
87 timeout_time,
88 on_timeout,
89 validity,
90 )
91 .await
92 }
93 Self::Finalize(stage) => {
94 coord
95 .finalize_alter_cluster_stage(
96 ctx.session(),
97 stage.plan.clone(),
98 stage.new_config.clone(),
99 )
100 .await
101 }
102 }
103 }
104
105 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106 Message::ClusterStageReady {
107 ctx,
108 span,
109 stage: self,
110 }
111 }
112
113 fn cancel_enabled(&self) -> bool {
114 true
115 }
116}
117
118impl Coordinator {
119 #[instrument]
120 pub(crate) async fn sequence_alter_cluster_staged(
121 &mut self,
122 ctx: ExecuteContext,
123 plan: plan::AlterClusterPlan,
124 ) {
125 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126 self.sequence_staged(ctx, Span::current(), stage).await;
127 }
128
129 #[instrument]
130 async fn alter_cluster_validate(
131 &mut self,
132 session: &Session,
133 plan: plan::AlterClusterPlan,
134 ) -> Result<ClusterStage, AdapterError> {
135 let validity = PlanValidity::new(
136 self.catalog().transient_revision(),
137 BTreeSet::new(),
138 Some(plan.id.clone()),
139 None,
140 session.role_metadata().clone(),
141 );
142 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143 }
144
145 async fn sequence_alter_cluster_stage(
146 &mut self,
147 session: &Session,
148 plan: plan::AlterClusterPlan,
149 validity: PlanValidity,
150 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151 let AlterClusterPlan {
152 id: cluster_id,
153 name: _,
154 ref options,
155 ref strategy,
156 } = plan;
157
158 use mz_catalog::memory::objects::ClusterVariant::*;
159 use mz_sql::plan::AlterOptionParameter::*;
160 let cluster = self.catalog.get_cluster(cluster_id);
161 let config = cluster.config.clone();
162 let mut new_config = config.clone();
163
164 match (&new_config.variant, &options.managed) {
165 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169 let size = "".to_string();
173 let disk = false;
174 let logging = ReplicaLogging {
175 log_logging: false,
176 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
177 };
178 new_config.variant = Managed(ClusterVariantManaged {
179 size,
180 availability_zones: Default::default(),
181 logging,
182 replication_factor: 1,
183 disk,
184 optimizer_feature_overrides: Default::default(),
185 schedule: Default::default(),
186 });
187 }
188 }
189
190 match &mut new_config.variant {
191 Managed(ClusterVariantManaged {
192 size,
193 availability_zones,
194 logging,
195 replication_factor,
196 disk,
197 optimizer_feature_overrides: _,
198 schedule,
199 }) => {
200 match &options.size {
201 Set(s) => size.clone_from(s),
202 Reset => coord_bail!("SIZE has no default value"),
203 Unchanged => {}
204 }
205 match &options.disk {
206 Set(d) => *disk = *d,
207 Reset => *disk = self.catalog.system_config().disk_cluster_replicas_default(),
208 Unchanged => {}
209 }
210 match &options.availability_zones {
211 Set(az) => availability_zones.clone_from(az),
212 Reset => *availability_zones = Default::default(),
213 Unchanged => {}
214 }
215 match &options.introspection_debugging {
216 Set(id) => logging.log_logging = *id,
217 Reset => logging.log_logging = false,
218 Unchanged => {}
219 }
220 match &options.introspection_interval {
221 Set(ii) => logging.interval = ii.0,
222 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
223 Unchanged => {}
224 }
225 match &options.replication_factor {
226 Set(rf) => *replication_factor = *rf,
227 Reset => {
228 *replication_factor = self
229 .catalog
230 .system_config()
231 .default_cluster_replication_factor()
232 }
233 Unchanged => {}
234 }
235 match &options.schedule {
236 Set(new_schedule) => {
237 *schedule = new_schedule.clone();
238 }
239 Reset => *schedule = Default::default(),
240 Unchanged => {}
241 }
242 if !matches!(options.replicas, Unchanged) {
243 coord_bail!("Cannot change REPLICAS of managed clusters");
244 }
245 }
246 Unmanaged => {
247 if !matches!(options.size, Unchanged) {
248 coord_bail!("Cannot change SIZE of unmanaged clusters");
249 }
250 if !matches!(options.availability_zones, Unchanged) {
251 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
252 }
253 if !matches!(options.introspection_debugging, Unchanged) {
254 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
255 }
256 if !matches!(options.introspection_interval, Unchanged) {
257 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
258 }
259 if !matches!(options.replication_factor, Unchanged) {
260 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
261 }
262 }
263 }
264
265 match &options.workload_class {
266 Set(wc) => new_config.workload_class.clone_from(wc),
267 Reset => new_config.workload_class = None,
268 Unchanged => {}
269 }
270
271 if new_config == config {
272 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
273 ObjectType::Cluster,
274 )));
275 }
276
277 let new_workload_class = new_config.workload_class.clone();
278 match (&config.variant, &new_config.variant) {
279 (Managed(_), Managed(new_config_managed)) => {
280 let alter_followup = self
281 .sequence_alter_cluster_managed_to_managed(
282 Some(session),
283 cluster_id,
284 new_config.clone(),
285 ReplicaCreateDropReason::Manual,
286 strategy.clone(),
287 )
288 .await?;
289 if alter_followup == NeedsFinalization::Yes {
290 self.active_conns
293 .get_mut(session.conn_id())
294 .expect("There must be an active connection")
295 .pending_cluster_alters
296 .insert(cluster_id.clone());
297 let new_config_managed = new_config_managed.clone();
298 return match &strategy {
299 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
300 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
301 .into(),
302 )),
303 AlterClusterPlanStrategy::For(duration) => {
304 let span = Span::current();
305 let plan = plan.clone();
306 let duration = duration.clone().to_owned();
307 Ok(StageResult::Handle(mz_ore::task::spawn(
308 || "Finalize Alter Cluster",
309 async move {
310 tokio::time::sleep(duration).await;
311 let stage = ClusterStage::Finalize(AlterClusterFinalize {
312 validity,
313 plan,
314 new_config: new_config_managed,
315 });
316 Ok(Box::new(stage))
317 }
318 .instrument(span),
319 )))
320 }
321 AlterClusterPlanStrategy::UntilReady {
322 timeout,
323 on_timeout,
324 } => Ok(StageResult::Immediate(Box::new(
325 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
326 validity,
327 plan: plan.clone(),
328 new_config: new_config_managed.clone(),
329 timeout_time: Instant::now() + timeout.to_owned(),
330 on_timeout: on_timeout.to_owned(),
331 }),
332 ))),
333 };
334 }
335 }
336 (Unmanaged, Managed(_)) => {
337 self.sequence_alter_cluster_unmanaged_to_managed(
338 session,
339 cluster_id,
340 new_config,
341 options.to_owned(),
342 )
343 .await?;
344 }
345 (Managed(_), Unmanaged) => {
346 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
347 .await?;
348 }
349 (Unmanaged, Unmanaged) => {
350 self.sequence_alter_cluster_unmanaged_to_unmanaged(
351 session,
352 cluster_id,
353 new_config,
354 options.replicas.clone(),
355 )
356 .await?;
357 }
358 }
359
360 self.controller
361 .update_cluster_workload_class(cluster_id, new_workload_class)?;
362
363 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
364 ObjectType::Cluster,
365 )))
366 }
367
368 async fn finalize_alter_cluster_stage(
369 &mut self,
370 session: &Session,
371 AlterClusterPlan {
372 id: cluster_id,
373 name: cluster_name,
374 ..
375 }: AlterClusterPlan,
376 new_config: ClusterVariantManaged,
377 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
378 let cluster = self.catalog.get_cluster(cluster_id);
379 let workload_class = cluster.config.workload_class.clone();
380 let mut ops = vec![];
381
382 let remove_replicas = cluster
385 .replicas()
386 .filter_map(|r| {
387 if !r.config.location.pending() && !r.config.location.internal() {
388 Some(catalog::DropObjectInfo::ClusterReplica((
389 cluster_id.clone(),
390 r.replica_id,
391 ReplicaCreateDropReason::Manual,
392 )))
393 } else {
394 None
395 }
396 })
397 .collect();
398 ops.push(catalog::Op::DropObjects(remove_replicas));
399
400 let finalize_replicas: Vec<catalog::Op> = cluster
403 .replicas()
404 .filter_map(|r| {
405 if r.config.location.pending() {
406 let cluster_ident = match Ident::new(cluster.name.clone()) {
407 Ok(id) => id,
408 Err(err) => {
409 return Some(Err(AdapterError::internal(
410 "Unexpected error parsing cluster name",
411 err,
412 )));
413 }
414 };
415 let replica_ident = match Ident::new(r.name.clone()) {
416 Ok(id) => id,
417 Err(err) => {
418 return Some(Err(AdapterError::internal(
419 "Unexpected error parsing replica name",
420 err,
421 )));
422 }
423 };
424 Some(Ok((cluster_ident, replica_ident, r)))
425 } else {
426 None
427 }
428 })
429 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
432 .into_iter()
433 .map(|(cluster_ident, replica_ident, replica)| {
434 let mut new_replica_config = replica.config.clone();
435 debug!("Promoting replica: {}", replica.name);
436 match new_replica_config.location {
437 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
438 ref mut pending,
439 ..
440 }) => {
441 *pending = false;
442 }
443 _ => {}
444 }
445
446 let mut replica_ops = vec![];
447 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
448 if let Some(to_name) = to_name {
449 replica_ops.push(catalog::Op::RenameClusterReplica {
450 cluster_id: cluster_id.clone(),
451 replica_id: replica.replica_id.to_owned(),
452 name: QualifiedReplica {
453 cluster: cluster_ident,
454 replica: replica_ident,
455 },
456 to_name: to_name.to_owned(),
457 });
458 }
459 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
460 cluster_id,
461 replica_id: replica.replica_id.to_owned(),
462 config: new_replica_config,
463 });
464 replica_ops
465 })
466 .flatten()
467 .collect();
468
469 ops.extend(finalize_replicas);
470
471 ops.push(Op::UpdateClusterConfig {
473 id: cluster_id,
474 name: cluster_name,
475 config: ClusterConfig {
476 variant: ClusterVariant::Managed(new_config),
477 workload_class: workload_class.clone(),
478 },
479 });
480 self.catalog_transact(Some(session), ops).await?;
481 self.active_conns
484 .get_mut(session.conn_id())
485 .expect("There must be an active connection")
486 .pending_cluster_alters
487 .remove(&cluster_id);
488
489 self.controller
490 .update_cluster_workload_class(cluster_id, workload_class)?;
491
492 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
493 ObjectType::Cluster,
494 )))
495 }
496
497 async fn check_if_pending_replicas_hydrated_stage(
498 &mut self,
499 session: &Session,
500 plan: AlterClusterPlan,
501 new_config: ClusterVariantManaged,
502 timeout_time: Instant,
503 on_timeout: OnTimeoutAction,
504 validity: PlanValidity,
505 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
506 let cluster = self.catalog.get_cluster(plan.id);
508 let pending_replicas = cluster
509 .replicas()
510 .filter_map(|r| {
511 if r.config.location.pending() {
512 Some(r.replica_id.clone())
513 } else {
514 None
515 }
516 })
517 .collect_vec();
518 if Instant::now() > timeout_time {
520 match on_timeout {
522 OnTimeoutAction::Rollback => {
523 self.active_conns
524 .get_mut(session.conn_id())
525 .expect("There must be an active connection")
526 .pending_cluster_alters
527 .remove(&cluster.id);
528 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
529 .await?;
530 return Err(AdapterError::AlterClusterTimeout);
531 }
532 OnTimeoutAction::Commit => {
533 let span = Span::current();
534 let poll_duration = self
535 .catalog
536 .system_config()
537 .cluster_alter_check_ready_interval()
538 .clone();
539 return Ok(StageResult::Handle(mz_ore::task::spawn(
540 || "Finalize Alter Cluster",
541 async move {
542 tokio::time::sleep(poll_duration).await;
543 let stage = ClusterStage::Finalize(AlterClusterFinalize {
544 validity,
545 plan,
546 new_config,
547 });
548 Ok(Box::new(stage))
549 }
550 .instrument(span),
551 )));
552 }
553 }
554 }
555 let compute_hydrated_fut = self
556 .controller
557 .compute
558 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
559 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
560
561 let storage_hydrated = self
562 .controller
563 .storage
564 .collections_hydrated_on_replicas(Some(pending_replicas), &[].into())
565 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
566
567 let span = Span::current();
568 Ok(StageResult::Handle(mz_ore::task::spawn(
569 || "Alter Cluster: wait for hydrated",
570 async move {
571 let compute_hydrated = compute_hydrated_fut
572 .await
573 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
574
575 if compute_hydrated && storage_hydrated {
576 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
578 validity,
579 plan,
580 new_config,
581 })))
582 } else {
583 tokio::time::sleep(Duration::from_secs(1)).await;
585 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
586 validity,
587 plan,
588 new_config,
589 timeout_time,
590 on_timeout,
591 });
592 Ok(Box::new(stage))
593 }
594 }
595 .instrument(span),
596 )))
597 }
598
599 #[mz_ore::instrument(level = "debug")]
600 pub(crate) async fn sequence_create_cluster(
601 &mut self,
602 session: &Session,
603 CreateClusterPlan {
604 name,
605 variant,
606 workload_class,
607 }: CreateClusterPlan,
608 ) -> Result<ExecuteResponse, AdapterError> {
609 tracing::debug!("sequence_create_cluster");
610
611 let id_ts = self.get_catalog_write_ts().await;
612 let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
613 let introspection_sources = BUILTINS::logs().collect();
618 let cluster_variant = match &variant {
619 CreateClusterVariant::Managed(plan) => {
620 let logging = if let Some(config) = plan.compute.introspection {
621 ReplicaLogging {
622 log_logging: config.debugging,
623 interval: Some(config.interval),
624 }
625 } else {
626 ReplicaLogging::default()
627 };
628 ClusterVariant::Managed(ClusterVariantManaged {
629 size: plan.size.clone(),
630 availability_zones: plan.availability_zones.clone(),
631 logging,
632 replication_factor: plan.replication_factor,
633 disk: plan.disk,
634 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
635 schedule: plan.schedule.clone(),
636 })
637 }
638 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
639 };
640 let config = ClusterConfig {
641 variant: cluster_variant,
642 workload_class,
643 };
644 let ops = vec![catalog::Op::CreateCluster {
645 id,
646 name: name.clone(),
647 introspection_sources,
648 owner_id: *session.current_role_id(),
649 config,
650 }];
651
652 match variant {
653 CreateClusterVariant::Managed(plan) => {
654 self.sequence_create_managed_cluster(session, plan, id, ops)
655 .await
656 }
657 CreateClusterVariant::Unmanaged(plan) => {
658 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
659 .await
660 }
661 }
662 }
663
664 #[mz_ore::instrument(level = "debug")]
665 async fn sequence_create_managed_cluster(
666 &mut self,
667 session: &Session,
668 CreateClusterManagedPlan {
669 availability_zones,
670 compute,
671 replication_factor,
672 size,
673 disk,
674 optimizer_feature_overrides: _,
675 schedule: _,
676 }: CreateClusterManagedPlan,
677 cluster_id: ClusterId,
678 mut ops: Vec<catalog::Op>,
679 ) -> Result<ExecuteResponse, AdapterError> {
680 tracing::debug!("sequence_create_managed_cluster");
681
682 self.ensure_valid_azs(availability_zones.iter())?;
683
684 let role_id = session.role_metadata().current_role;
685 self.catalog.ensure_valid_replica_size(
686 &self
687 .catalog()
688 .get_role_allowed_cluster_sizes(&Some(role_id)),
689 &size,
690 )?;
691
692 if cluster_id.is_user() {
697 self.validate_resource_limit(
698 0,
699 i64::from(replication_factor),
700 SystemVars::max_replicas_per_cluster,
701 "cluster replica",
702 MAX_REPLICAS_PER_CLUSTER.name(),
703 )?;
704 }
705
706 for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
707 self.create_managed_cluster_replica_op(
708 cluster_id,
709 replica_name,
710 &compute,
711 &size,
712 &mut ops,
713 if availability_zones.is_empty() {
714 None
715 } else {
716 Some(availability_zones.as_ref())
717 },
718 disk,
719 false,
720 *session.current_role_id(),
721 ReplicaCreateDropReason::Manual,
722 )?;
723 }
724
725 self.catalog_transact(Some(session), ops).await?;
726
727 self.create_cluster(cluster_id).await;
728
729 Ok(ExecuteResponse::CreatedCluster)
730 }
731
732 fn create_managed_cluster_replica_op(
733 &self,
734 cluster_id: ClusterId,
735 name: String,
736 compute: &mz_sql::plan::ComputeReplicaConfig,
737 size: &String,
738 ops: &mut Vec<Op>,
739 azs: Option<&[String]>,
740 disk: bool,
741 pending: bool,
742 owner_id: RoleId,
743 reason: ReplicaCreateDropReason,
744 ) -> Result<(), AdapterError> {
745 let location = mz_catalog::durable::ReplicaLocation::Managed {
746 availability_zone: None,
747 billed_as: None,
748 disk,
749 internal: false,
750 size: size.clone(),
751 pending,
752 };
753
754 let logging = if let Some(config) = compute.introspection {
755 ReplicaLogging {
756 log_logging: config.debugging,
757 interval: Some(config.interval),
758 }
759 } else {
760 ReplicaLogging::default()
761 };
762
763 let config = ReplicaConfig {
764 location: self.catalog().concretize_replica_location(
765 location,
766 &self
767 .catalog()
768 .get_role_allowed_cluster_sizes(&Some(owner_id)),
769 azs,
770 )?,
771 compute: ComputeReplicaConfig { logging },
772 };
773
774 ops.push(catalog::Op::CreateClusterReplica {
775 cluster_id,
776 name,
777 config,
778 owner_id,
779 reason,
780 });
781 Ok(())
782 }
783
784 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
785 &self,
786 azs: I,
787 ) -> Result<(), AdapterError> {
788 let cat_azs = self.catalog().state().availability_zones();
789 for az in azs.into_iter() {
790 if !cat_azs.contains(az) {
791 return Err(AdapterError::InvalidClusterReplicaAz {
792 az: az.to_string(),
793 expected: cat_azs.to_vec(),
794 });
795 }
796 }
797 Ok(())
798 }
799
800 #[mz_ore::instrument(level = "debug")]
801 async fn sequence_create_unmanaged_cluster(
802 &mut self,
803 session: &Session,
804 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
805 id: ClusterId,
806 mut ops: Vec<catalog::Op>,
807 ) -> Result<ExecuteResponse, AdapterError> {
808 tracing::debug!("sequence_create_unmanaged_cluster");
809
810 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
811 if let mz_sql::plan::ReplicaConfig::Orchestrated {
812 availability_zone: Some(az),
813 ..
814 } = &r
815 {
816 Some(az)
817 } else {
818 None
819 }
820 }))?;
821
822 if id.is_user() {
827 self.validate_resource_limit(
828 0,
829 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
830 SystemVars::max_replicas_per_cluster,
831 "cluster replica",
832 MAX_REPLICAS_PER_CLUSTER.name(),
833 )?;
834 }
835
836 for (replica_name, replica_config) in replicas {
837 let (compute, location) = match replica_config {
840 mz_sql::plan::ReplicaConfig::Unorchestrated {
841 storagectl_addrs,
842 storage_addrs,
843 computectl_addrs,
844 compute_addrs,
845 workers,
846 compute,
847 } => {
848 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
849 storagectl_addrs,
850 storage_addrs,
851 computectl_addrs,
852 compute_addrs,
853 workers,
854 };
855 (compute, location)
856 }
857 mz_sql::plan::ReplicaConfig::Orchestrated {
858 availability_zone,
859 billed_as,
860 compute,
861 disk,
862 internal,
863 size,
864 } => {
865 if !session.user().is_internal() && (internal || billed_as.is_some()) {
867 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
868 }
869 if billed_as.is_some() && !internal {
871 coord_bail!("must specify INTERNAL when specifying BILLED AS");
872 }
873
874 let location = mz_catalog::durable::ReplicaLocation::Managed {
875 availability_zone,
876 billed_as,
877 disk,
878 internal,
879 size: size.clone(),
880 pending: false,
881 };
882 (compute, location)
883 }
884 };
885
886 let logging = if let Some(config) = compute.introspection {
887 ReplicaLogging {
888 log_logging: config.debugging,
889 interval: Some(config.interval),
890 }
891 } else {
892 ReplicaLogging::default()
893 };
894
895 let role_id = session.role_metadata().current_role;
896 let config = ReplicaConfig {
897 location: self.catalog().concretize_replica_location(
898 location,
899 &self
900 .catalog()
901 .get_role_allowed_cluster_sizes(&Some(role_id)),
902 None,
903 )?,
904 compute: ComputeReplicaConfig { logging },
905 };
906
907 ops.push(catalog::Op::CreateClusterReplica {
908 cluster_id: id,
909 name: replica_name.clone(),
910 config,
911 owner_id: *session.current_role_id(),
912 reason: ReplicaCreateDropReason::Manual,
913 });
914 }
915
916 self.catalog_transact(Some(session), ops).await?;
917
918 self.create_cluster(id).await;
919
920 Ok(ExecuteResponse::CreatedCluster)
921 }
922
923 async fn create_cluster(&mut self, cluster_id: ClusterId) {
924 let Coordinator {
925 catalog,
926 controller,
927 ..
928 } = self;
929 let cluster = catalog.get_cluster(cluster_id);
930 let cluster_id = cluster.id;
931 let introspection_source_ids: Vec<_> =
932 cluster.log_indexes.iter().map(|(_, id)| *id).collect();
933
934 controller
935 .create_cluster(
936 cluster_id,
937 mz_controller::clusters::ClusterConfig {
938 arranged_logs: cluster.log_indexes.clone(),
939 workload_class: cluster.config.workload_class.clone(),
940 },
941 )
942 .expect("creating cluster must not fail");
943
944 let replica_ids: Vec<_> = cluster.replicas().map(|r| r.replica_id).collect();
945 for replica_id in replica_ids {
946 self.create_cluster_replica(cluster_id, replica_id).await;
947 }
948
949 if !introspection_source_ids.is_empty() {
950 self.initialize_compute_read_policies(
951 introspection_source_ids,
952 cluster_id,
953 CompactionWindow::Default,
954 )
955 .await;
956 }
957 }
958
959 #[mz_ore::instrument(level = "debug")]
960 pub(crate) async fn sequence_create_cluster_replica(
961 &mut self,
962 session: &Session,
963 CreateClusterReplicaPlan {
964 name,
965 cluster_id,
966 config,
967 }: CreateClusterReplicaPlan,
968 ) -> Result<ExecuteResponse, AdapterError> {
969 let (compute, location) = match config {
971 mz_sql::plan::ReplicaConfig::Unorchestrated {
972 storagectl_addrs,
973 storage_addrs,
974 computectl_addrs,
975 compute_addrs,
976 workers,
977 compute,
978 } => {
979 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
980 storagectl_addrs,
981 storage_addrs,
982 computectl_addrs,
983 compute_addrs,
984 workers,
985 };
986 (compute, location)
987 }
988 mz_sql::plan::ReplicaConfig::Orchestrated {
989 availability_zone,
990 billed_as,
991 compute,
992 disk,
993 internal,
994 size,
995 } => {
996 let availability_zone = match availability_zone {
997 Some(az) => {
998 self.ensure_valid_azs([&az])?;
999 Some(az)
1000 }
1001 None => None,
1002 };
1003 let location = mz_catalog::durable::ReplicaLocation::Managed {
1004 availability_zone,
1005 billed_as,
1006 disk,
1007 internal,
1008 size,
1009 pending: false,
1010 };
1011 (compute, location)
1012 }
1013 };
1014
1015 let logging = if let Some(config) = compute.introspection {
1016 ReplicaLogging {
1017 log_logging: config.debugging,
1018 interval: Some(config.interval),
1019 }
1020 } else {
1021 ReplicaLogging::default()
1022 };
1023
1024 let role_id = session.role_metadata().current_role;
1025 let config = ReplicaConfig {
1026 location: self.catalog().concretize_replica_location(
1027 location,
1028 &self
1029 .catalog()
1030 .get_role_allowed_cluster_sizes(&Some(role_id)),
1031 None,
1034 )?,
1035 compute: ComputeReplicaConfig { logging },
1036 };
1037
1038 let cluster = self.catalog().get_cluster(cluster_id);
1039
1040 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1041 internal,
1042 billed_as,
1043 ..
1044 }) = &config.location
1045 {
1046 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1048 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1049 }
1050 if cluster.is_managed() && !*internal {
1052 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1053 }
1054 if billed_as.is_some() && !*internal {
1056 coord_bail!("must specify INTERNAL when specifying BILLED AS");
1057 }
1058 }
1059
1060 let owner_id = cluster.owner_id();
1062 let op = catalog::Op::CreateClusterReplica {
1063 cluster_id,
1064 name: name.clone(),
1065 config,
1066 owner_id,
1067 reason: ReplicaCreateDropReason::Manual,
1068 };
1069
1070 self.catalog_transact(Some(session), vec![op]).await?;
1071
1072 let id = self
1073 .catalog()
1074 .resolve_replica_in_cluster(&cluster_id, &name)
1075 .expect("just created")
1076 .replica_id();
1077 self.create_cluster_replica(cluster_id, id).await;
1078
1079 Ok(ExecuteResponse::CreatedClusterReplica)
1080 }
1081
1082 async fn create_cluster_replica(&mut self, cluster_id: ClusterId, replica_id: ReplicaId) {
1083 let cluster = self.catalog().get_cluster(cluster_id);
1084 let role = cluster.role();
1085 let replica_config = cluster
1086 .replica(replica_id)
1087 .expect("known to exist")
1088 .config
1089 .clone();
1090
1091 let enable_worker_core_affinity =
1092 self.catalog().system_config().enable_worker_core_affinity();
1093
1094 self.controller
1095 .create_replica(
1096 cluster_id,
1097 replica_id,
1098 role,
1099 replica_config,
1100 enable_worker_core_affinity,
1101 )
1102 .expect("creating replicas must not fail");
1103
1104 self.install_introspection_subscribes(cluster_id, replica_id)
1105 .await;
1106 }
1107
1108 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1117 &mut self,
1118 session: Option<&Session>,
1119 cluster_id: ClusterId,
1120 new_config: ClusterConfig,
1121 reason: ReplicaCreateDropReason,
1122 strategy: AlterClusterPlanStrategy,
1123 ) -> Result<NeedsFinalization, AdapterError> {
1124 let cluster = self.catalog.get_cluster(cluster_id);
1125 let name = cluster.name().to_string();
1126 let owner_id = cluster.owner_id();
1127
1128 let mut ops = vec![];
1129 let mut create_cluster_replicas = vec![];
1130 let mut finalization_needed = NeedsFinalization::No;
1131
1132 let ClusterVariant::Managed(ClusterVariantManaged {
1133 size,
1134 availability_zones,
1135 logging,
1136 replication_factor,
1137 disk,
1138 optimizer_feature_overrides: _,
1139 schedule: _,
1140 }) = &cluster.config.variant
1141 else {
1142 panic!("expected existing managed cluster config");
1143 };
1144 let ClusterVariant::Managed(ClusterVariantManaged {
1145 size: new_size,
1146 replication_factor: new_replication_factor,
1147 availability_zones: new_availability_zones,
1148 logging: new_logging,
1149 disk: new_disk,
1150 optimizer_feature_overrides: _,
1151 schedule: _,
1152 }) = &new_config.variant
1153 else {
1154 panic!("expected new managed cluster config");
1155 };
1156
1157 let role_id = session.map(|s| s.role_metadata().current_role);
1158 self.catalog.ensure_valid_replica_size(
1159 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1160 new_size,
1161 )?;
1162
1163 if cluster.replicas().any(|r| r.config.location.pending()) {
1165 coord_bail!("Cannot Alter clusters with pending updates.")
1166 }
1167
1168 let compute = mz_sql::plan::ComputeReplicaConfig {
1169 introspection: new_logging
1170 .interval
1171 .map(|interval| ComputeReplicaIntrospectionConfig {
1172 debugging: new_logging.log_logging,
1173 interval,
1174 }),
1175 };
1176
1177 if new_replication_factor > replication_factor {
1182 if cluster_id.is_user() {
1183 self.validate_resource_limit(
1184 usize::cast_from(*replication_factor),
1185 i64::from(*new_replication_factor) - i64::from(*replication_factor),
1186 SystemVars::max_replicas_per_cluster,
1187 "cluster replica",
1188 MAX_REPLICAS_PER_CLUSTER.name(),
1189 )?;
1190 }
1191 }
1192
1193 if new_size != size
1194 || new_availability_zones != availability_zones
1195 || new_logging != logging
1196 || new_disk != disk
1197 {
1198 self.ensure_valid_azs(new_availability_zones.iter())?;
1199 match strategy {
1203 AlterClusterPlanStrategy::None => {
1204 let replica_ids_and_reasons = (0..*replication_factor)
1205 .map(managed_cluster_replica_name)
1206 .filter_map(|name| cluster.replica_id(&name))
1207 .map(|replica_id| {
1208 catalog::DropObjectInfo::ClusterReplica((
1209 cluster.id(),
1210 replica_id,
1211 reason.clone(),
1212 ))
1213 })
1214 .collect();
1215 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1216 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1217 self.create_managed_cluster_replica_op(
1218 cluster_id,
1219 name.clone(),
1220 &compute,
1221 new_size,
1222 &mut ops,
1223 Some(new_availability_zones.as_ref()),
1224 *new_disk,
1225 false,
1226 owner_id,
1227 reason.clone(),
1228 )?;
1229 create_cluster_replicas.push((cluster_id, name));
1230 }
1231 }
1232 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1233 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1234 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1235 self.create_managed_cluster_replica_op(
1236 cluster_id,
1237 name.clone(),
1238 &compute,
1239 new_size,
1240 &mut ops,
1241 Some(new_availability_zones.as_ref()),
1242 *new_disk,
1243 true,
1244 owner_id,
1245 reason.clone(),
1246 )?;
1247 create_cluster_replicas.push((cluster_id, name));
1248 }
1249 finalization_needed = NeedsFinalization::Yes;
1250 }
1251 }
1252 } else if new_replication_factor < replication_factor {
1253 let replica_ids = (*new_replication_factor..*replication_factor)
1255 .map(managed_cluster_replica_name)
1256 .filter_map(|name| cluster.replica_id(&name))
1257 .map(|replica_id| {
1258 catalog::DropObjectInfo::ClusterReplica((
1259 cluster.id(),
1260 replica_id,
1261 reason.clone(),
1262 ))
1263 })
1264 .collect();
1265 ops.push(catalog::Op::DropObjects(replica_ids));
1266 } else if new_replication_factor > replication_factor {
1267 for name in
1269 (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1270 {
1271 self.create_managed_cluster_replica_op(
1272 cluster_id,
1273 name.clone(),
1274 &compute,
1275 new_size,
1276 &mut ops,
1277 Some(new_availability_zones.as_ref()),
1280 *new_disk,
1281 false,
1282 owner_id,
1283 reason.clone(),
1284 )?;
1285 create_cluster_replicas.push((cluster_id, name))
1286 }
1287 }
1288
1289 match finalization_needed {
1292 NeedsFinalization::No => {
1293 ops.push(catalog::Op::UpdateClusterConfig {
1294 id: cluster_id,
1295 name,
1296 config: new_config,
1297 });
1298 }
1299 _ => {}
1300 }
1301 self.catalog_transact(session, ops.clone()).await?;
1302 for (cluster_id, replica_name) in create_cluster_replicas {
1303 let replica_id = self
1304 .catalog()
1305 .resolve_replica_in_cluster(&cluster_id, &replica_name)
1306 .expect("just created")
1307 .replica_id();
1308 self.create_cluster_replica(cluster_id, replica_id).await;
1309 }
1310 Ok(finalization_needed)
1311 }
1312
1313 async fn sequence_alter_cluster_unmanaged_to_managed(
1317 &mut self,
1318 session: &Session,
1319 cluster_id: ClusterId,
1320 mut new_config: ClusterConfig,
1321 options: PlanClusterOption,
1322 ) -> Result<(), AdapterError> {
1323 let cluster = self.catalog.get_cluster(cluster_id);
1324 let cluster_name = cluster.name().to_string();
1325
1326 let ClusterVariant::Managed(ClusterVariantManaged {
1327 size: new_size,
1328 replication_factor: new_replication_factor,
1329 availability_zones: new_availability_zones,
1330 logging: _,
1331 disk: new_disk,
1332 optimizer_feature_overrides: _,
1333 schedule: _,
1334 }) = &mut new_config.variant
1335 else {
1336 panic!("expected new managed cluster config");
1337 };
1338
1339 let user_replica_count = cluster
1341 .user_replicas()
1342 .count()
1343 .try_into()
1344 .expect("must_fit");
1345 match options.replication_factor {
1346 AlterOptionParameter::Set(_) => {
1347 if user_replica_count != *new_replication_factor {
1349 coord_bail!(
1350 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1351 );
1352 }
1353 }
1354 _ => {
1355 *new_replication_factor = user_replica_count;
1356 }
1357 }
1358
1359 let mut names = BTreeSet::new();
1360 let mut sizes = BTreeSet::new();
1361 let mut disks = BTreeSet::new();
1362
1363 self.ensure_valid_azs(new_availability_zones.iter())?;
1364
1365 for replica in cluster.user_replicas() {
1367 names.insert(replica.name.clone());
1368 match &replica.config.location {
1369 ReplicaLocation::Unmanaged(_) => coord_bail!(
1370 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1371 ),
1372 ReplicaLocation::Managed(location) => {
1373 sizes.insert(location.size.clone());
1374 disks.insert(location.disk);
1375
1376 if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1377 &location.availability_zones
1378 {
1379 if !new_availability_zones.contains(az) {
1380 coord_bail!(
1381 "unmanaged replica has availability zone {az} which is not \
1382 in managed {new_availability_zones:?}"
1383 )
1384 }
1385 }
1386 }
1387 }
1388 }
1389
1390 if sizes.is_empty() {
1391 assert!(
1392 cluster.user_replicas().next().is_none(),
1393 "Cluster should not have replicas"
1394 );
1395 match &options.size {
1397 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1398 coord_bail!("Missing SIZE for empty cluster")
1399 }
1400 _ => {} }
1402 } else if sizes.len() == 1 {
1403 let size = sizes.into_iter().next().expect("must exist");
1404 match &options.size {
1405 AlterOptionParameter::Set(sz) if *sz != size => {
1406 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1407 }
1408 _ => *new_size = size,
1409 }
1410 } else {
1411 let formatted = sizes
1412 .iter()
1413 .map(String::as_str)
1414 .collect::<Vec<_>>()
1415 .join(", ");
1416 coord_bail!(
1417 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1418 );
1419 }
1420
1421 for i in 0..*new_replication_factor {
1422 let name = managed_cluster_replica_name(i);
1423 names.remove(&name);
1424 }
1425 if !names.is_empty() {
1426 let formatted = names
1427 .iter()
1428 .map(String::as_str)
1429 .collect::<Vec<_>>()
1430 .join(", ");
1431 coord_bail!(
1432 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1433 );
1434 }
1435
1436 if disks.len() == 1 {
1437 let disk = disks.into_iter().next().expect("must exist");
1438 match &options.disk {
1439 AlterOptionParameter::Set(ds) if *ds != disk => {
1440 coord_bail!(
1441 "Cluster replicas with DISK {disk} do not match expected DISK {ds}"
1442 );
1443 }
1444 _ => *new_disk = disk,
1445 }
1446 } else if !disks.is_empty() {
1447 coord_bail!(
1448 "Cannot convert unmanaged cluster to managed, non-unique replica DISK options"
1449 );
1450 }
1451
1452 let ops = vec![catalog::Op::UpdateClusterConfig {
1453 id: cluster_id,
1454 name: cluster_name,
1455 config: new_config,
1456 }];
1457
1458 self.catalog_transact(Some(session), ops).await?;
1459 Ok(())
1460 }
1461
1462 async fn sequence_alter_cluster_managed_to_unmanaged(
1463 &mut self,
1464 session: &Session,
1465 cluster_id: ClusterId,
1466 new_config: ClusterConfig,
1467 ) -> Result<(), AdapterError> {
1468 let cluster = self.catalog().get_cluster(cluster_id);
1469
1470 let ops = vec![catalog::Op::UpdateClusterConfig {
1471 id: cluster_id,
1472 name: cluster.name().to_string(),
1473 config: new_config,
1474 }];
1475
1476 self.catalog_transact(Some(session), ops).await?;
1477 Ok(())
1478 }
1479
1480 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1481 &mut self,
1482 session: &Session,
1483 cluster_id: ClusterId,
1484 new_config: ClusterConfig,
1485 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1486 ) -> Result<(), AdapterError> {
1487 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1488 coord_bail!("Cannot alter replicas in unmanaged cluster");
1489 }
1490
1491 let cluster = self.catalog().get_cluster(cluster_id);
1492
1493 let ops = vec![catalog::Op::UpdateClusterConfig {
1494 id: cluster_id,
1495 name: cluster.name().to_string(),
1496 config: new_config,
1497 }];
1498
1499 self.catalog_transact(Some(session), ops).await?;
1500 Ok(())
1501 }
1502
1503 pub(crate) async fn sequence_alter_cluster_rename(
1504 &mut self,
1505 session: &mut Session,
1506 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1507 ) -> Result<ExecuteResponse, AdapterError> {
1508 let op = Op::RenameCluster {
1509 id,
1510 name,
1511 to_name,
1512 check_reserved_names: true,
1513 };
1514 match self
1515 .catalog_transact_with_ddl_transaction(session, vec![op])
1516 .await
1517 {
1518 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1519 Err(err) => Err(err),
1520 }
1521 }
1522
1523 pub(crate) async fn sequence_alter_cluster_swap(
1524 &mut self,
1525 session: &mut Session,
1526 AlterClusterSwapPlan {
1527 id_a,
1528 id_b,
1529 name_a,
1530 name_b,
1531 name_temp,
1532 }: AlterClusterSwapPlan,
1533 ) -> Result<ExecuteResponse, AdapterError> {
1534 let op_a = Op::RenameCluster {
1535 id: id_a,
1536 name: name_a.clone(),
1537 to_name: name_temp.clone(),
1538 check_reserved_names: false,
1539 };
1540 let op_b = Op::RenameCluster {
1541 id: id_b,
1542 name: name_b.clone(),
1543 to_name: name_a,
1544 check_reserved_names: false,
1545 };
1546 let op_temp = Op::RenameCluster {
1547 id: id_a,
1548 name: name_temp,
1549 to_name: name_b,
1550 check_reserved_names: false,
1551 };
1552
1553 match self
1554 .catalog_transact_with_ddl_transaction(session, vec![op_a, op_b, op_temp])
1555 .await
1556 {
1557 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1558 Err(err) => Err(err),
1559 }
1560 }
1561
1562 pub(crate) async fn sequence_alter_cluster_replica_rename(
1563 &mut self,
1564 session: &Session,
1565 AlterClusterReplicaRenamePlan {
1566 cluster_id,
1567 replica_id,
1568 name,
1569 to_name,
1570 }: AlterClusterReplicaRenamePlan,
1571 ) -> Result<ExecuteResponse, AdapterError> {
1572 let op = catalog::Op::RenameClusterReplica {
1573 cluster_id,
1574 replica_id,
1575 name,
1576 to_name,
1577 };
1578 match self.catalog_transact(Some(session), vec![op]).await {
1579 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1580 Err(err) => Err(err),
1581 }
1582 }
1583
1584 pub(crate) async fn sequence_alter_set_cluster(
1586 &self,
1587 _session: &Session,
1588 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1589 ) -> Result<ExecuteResponse, AdapterError> {
1590 async {}.await;
1594 let entry = self.catalog().get_entry(&id);
1595 match entry.item().typ() {
1596 _ => {
1597 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1599 }
1600 }
1601 }
1602}
1603
1604fn managed_cluster_replica_name(index: u32) -> String {
1605 format!("r{}", index + 1)
1606}
1607
1608#[derive(PartialEq)]
1611pub(crate) enum NeedsFinalization {
1612 Yes,
1614 No,
1615}