1use std::collections::{BTreeMap, BTreeSet};
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_catalog::builtin::BUILTINS;
16use mz_catalog::memory::objects::{
17 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
18};
19use mz_compute_types::config::ComputeReplicaConfig;
20use mz_controller::clusters::{
21 ManagedReplicaLocation, ReplicaConfig, ReplicaLocation, ReplicaLogging,
22};
23use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
24use mz_ore::cast::CastFrom;
25use mz_ore::collections::CollectionExt;
26use mz_ore::instrument;
27use mz_repr::role_id::RoleId;
28use mz_sql::ast::{Ident, QualifiedReplica};
29use mz_sql::catalog::{CatalogCluster, ObjectType};
30use mz_sql::plan::{
31 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
32 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
33 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
34 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
35};
36use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
39use tracing::{Instrument, Span, debug};
40
41use super::return_if_err;
42use crate::AdapterError::AlterClusterWhilePendingReplicas;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46 Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53 type Ctx = ExecuteContext;
54
55 fn validity(&mut self) -> &mut PlanValidity {
56 match self {
57 Self::Alter(stage) => &mut stage.validity,
58 Self::WaitForHydrated(stage) => &mut stage.validity,
59 Self::Finalize(stage) => &mut stage.validity,
60 }
61 }
62
63 async fn stage(
64 self,
65 coord: &mut Coordinator,
66 ctx: &mut ExecuteContext,
67 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68 match self {
69 Self::Alter(stage) => {
70 coord
71 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72 .await
73 }
74 Self::WaitForHydrated(stage) => {
75 let AlterClusterWaitForHydrated {
76 validity,
77 plan,
78 new_config,
79 workload_class,
80 timeout_time,
81 on_timeout,
82 } = stage;
83 coord
84 .check_if_pending_replicas_hydrated_stage(
85 ctx.session(),
86 plan,
87 new_config,
88 workload_class,
89 timeout_time,
90 on_timeout,
91 validity,
92 )
93 .await
94 }
95 Self::Finalize(stage) => {
96 coord
97 .finalize_alter_cluster_stage(
98 ctx.session(),
99 stage.plan.clone(),
100 stage.new_config.clone(),
101 stage.workload_class.clone(),
102 )
103 .await
104 }
105 }
106 }
107
108 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
109 Message::ClusterStageReady {
110 ctx,
111 span,
112 stage: self,
113 }
114 }
115
116 fn cancel_enabled(&self) -> bool {
117 true
118 }
119}
120
121impl Coordinator {
122 #[instrument]
123 pub(crate) async fn sequence_alter_cluster_staged(
124 &mut self,
125 ctx: ExecuteContext,
126 plan: plan::AlterClusterPlan,
127 ) {
128 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
129 self.sequence_staged(ctx, Span::current(), stage).await;
130 }
131
132 #[instrument]
133 async fn alter_cluster_validate(
134 &self,
135 session: &Session,
136 plan: plan::AlterClusterPlan,
137 ) -> Result<ClusterStage, AdapterError> {
138 let validity = PlanValidity::new(
139 self.catalog().transient_revision(),
140 BTreeSet::new(),
141 Some(plan.id.clone()),
142 None,
143 session.role_metadata().clone(),
144 );
145 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
146 }
147
148 async fn sequence_alter_cluster_stage(
149 &mut self,
150 session: &Session,
151 plan: plan::AlterClusterPlan,
152 validity: PlanValidity,
153 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
154 let AlterClusterPlan {
155 id: cluster_id,
156 name: _,
157 ref options,
158 ref strategy,
159 } = plan;
160
161 use mz_catalog::memory::objects::ClusterVariant::*;
162 use mz_sql::plan::AlterOptionParameter::*;
163 let cluster = self.catalog.get_cluster(cluster_id);
164 let config = cluster.config.clone();
165 let mut new_config = config.clone();
166
167 match (&new_config.variant, &options.managed) {
168 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
169 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
170 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
171 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
172 let size = "".to_string();
176 let logging = ReplicaLogging {
177 log_logging: false,
178 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
179 };
180 new_config.variant = Managed(ClusterVariantManaged {
181 size,
182 availability_zones: Default::default(),
183 logging,
184 replication_factor: 1,
185 optimizer_feature_overrides: Default::default(),
186 schedule: Default::default(),
187 auto_scaling_strategy: None,
188 reconfiguration: None,
189 burst: None,
190 });
191 }
192 }
193
194 match &mut new_config.variant {
195 Managed(ClusterVariantManaged {
196 size,
197 availability_zones,
198 logging,
199 replication_factor,
200 optimizer_feature_overrides: _,
201 schedule,
202 auto_scaling_strategy: _,
203 reconfiguration: _,
204 burst: _,
205 }) => {
206 match &options.size {
207 Set(s) => size.clone_from(s),
208 Reset => coord_bail!("SIZE has no default value"),
209 Unchanged => {}
210 }
211 match &options.availability_zones {
212 Set(az) => availability_zones.clone_from(az),
213 Reset => *availability_zones = Default::default(),
214 Unchanged => {}
215 }
216 match &options.introspection_debugging {
217 Set(id) => logging.log_logging = *id,
218 Reset => logging.log_logging = false,
219 Unchanged => {}
220 }
221 match &options.introspection_interval {
222 Set(ii) => logging.interval = ii.0,
223 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
224 Unchanged => {}
225 }
226 match &options.replication_factor {
227 Set(rf) => *replication_factor = *rf,
228 Reset => {
229 *replication_factor = self
230 .catalog
231 .system_config()
232 .default_cluster_replication_factor()
233 }
234 Unchanged => {}
235 }
236 match &options.schedule {
237 Set(new_schedule) => {
238 *schedule = new_schedule.clone();
239 }
240 Reset => *schedule = Default::default(),
241 Unchanged => {}
242 }
243 if !matches!(options.replicas, Unchanged) {
244 coord_bail!("Cannot change REPLICAS of managed clusters");
245 }
246 }
247 Unmanaged => {
248 if !matches!(options.size, Unchanged) {
249 coord_bail!("Cannot change SIZE of unmanaged clusters");
250 }
251 if !matches!(options.availability_zones, Unchanged) {
252 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
253 }
254 if !matches!(options.introspection_debugging, Unchanged) {
255 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
256 }
257 if !matches!(options.introspection_interval, Unchanged) {
258 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
259 }
260 if !matches!(options.replication_factor, Unchanged) {
261 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
262 }
263 }
264 }
265
266 match &options.workload_class {
267 Set(wc) => new_config.workload_class.clone_from(wc),
268 Reset => new_config.workload_class = None,
269 Unchanged => {}
270 }
271
272 if new_config == config {
273 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
274 ObjectType::Cluster,
275 )));
276 }
277
278 match (&config.variant, &new_config.variant) {
279 (Managed(_), Managed(new_config_managed)) => {
280 let alter_followup = self
281 .sequence_alter_cluster_managed_to_managed(
282 Some(session),
283 cluster_id,
284 new_config.clone(),
285 ReplicaCreateDropReason::Manual,
286 strategy.clone(),
287 )
288 .await?;
289 if alter_followup == NeedsFinalization::Yes {
290 self.active_conns
293 .get_mut(session.conn_id())
294 .expect("There must be an active connection")
295 .pending_cluster_alters
296 .insert(cluster_id.clone());
297 let new_config_managed = new_config_managed.clone();
298 return match &strategy {
299 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
300 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
301 .into(),
302 )),
303 AlterClusterPlanStrategy::For(duration) => {
304 let span = Span::current();
305 let plan = plan.clone();
306 let duration = duration.clone().to_owned();
307 let workload_class = new_config.workload_class.clone();
308 Ok(StageResult::Handle(mz_ore::task::spawn(
309 || "Finalize Alter Cluster",
310 async move {
311 tokio::time::sleep(duration).await;
312 let stage = ClusterStage::Finalize(AlterClusterFinalize {
313 validity,
314 plan,
315 new_config: new_config_managed,
316 workload_class,
317 });
318 Ok(Box::new(stage))
319 }
320 .instrument(span),
321 )))
322 }
323 AlterClusterPlanStrategy::UntilReady {
324 timeout,
325 on_timeout,
326 } => Ok(StageResult::Immediate(Box::new(
327 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
328 validity,
329 plan: plan.clone(),
330 new_config: new_config_managed.clone(),
331 workload_class: new_config.workload_class.clone(),
332 timeout_time: Instant::now() + timeout.to_owned(),
333 on_timeout: on_timeout.to_owned(),
334 }),
335 ))),
336 };
337 }
338 }
339 (Unmanaged, Managed(_)) => {
340 self.sequence_alter_cluster_unmanaged_to_managed(
341 session,
342 cluster_id,
343 new_config,
344 options.to_owned(),
345 )
346 .await?;
347 }
348 (Managed(_), Unmanaged) => {
349 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
350 .await?;
351 }
352 (Unmanaged, Unmanaged) => {
353 self.sequence_alter_cluster_unmanaged_to_unmanaged(
354 session,
355 cluster_id,
356 new_config,
357 options.replicas.clone(),
358 )
359 .await?;
360 }
361 }
362
363 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
364 ObjectType::Cluster,
365 )))
366 }
367
368 async fn finalize_alter_cluster_stage(
369 &mut self,
370 session: &Session,
371 AlterClusterPlan {
372 id: cluster_id,
373 name: cluster_name,
374 ..
375 }: AlterClusterPlan,
376 new_config: ClusterVariantManaged,
377 workload_class: Option<String>,
378 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
379 let cluster = self.catalog.get_cluster(cluster_id);
380 let mut ops = vec![];
381
382 let remove_replicas = cluster
385 .replicas()
386 .filter_map(|r| {
387 if !r.config.location.pending() && !r.config.location.internal() {
388 Some(catalog::DropObjectInfo::ClusterReplica((
389 cluster_id.clone(),
390 r.replica_id,
391 ReplicaCreateDropReason::Manual,
392 )))
393 } else {
394 None
395 }
396 })
397 .collect();
398 ops.push(catalog::Op::DropObjects(remove_replicas));
399
400 let finalize_replicas: Vec<catalog::Op> = cluster
403 .replicas()
404 .filter_map(|r| {
405 if r.config.location.pending() {
406 let cluster_ident = match Ident::new(cluster.name.clone()) {
407 Ok(id) => id,
408 Err(err) => {
409 return Some(Err(AdapterError::internal(
410 "Unexpected error parsing cluster name",
411 err,
412 )));
413 }
414 };
415 let replica_ident = match Ident::new(r.name.clone()) {
416 Ok(id) => id,
417 Err(err) => {
418 return Some(Err(AdapterError::internal(
419 "Unexpected error parsing replica name",
420 err,
421 )));
422 }
423 };
424 Some(Ok((cluster_ident, replica_ident, r)))
425 } else {
426 None
427 }
428 })
429 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
432 .into_iter()
433 .map(|(cluster_ident, replica_ident, replica)| {
434 let mut new_replica_config = replica.config.clone();
435 debug!("Promoting replica: {}", replica.name);
436 match new_replica_config.location {
437 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
438 ref mut pending,
439 ..
440 }) => {
441 *pending = false;
442 }
443 mz_controller::clusters::ReplicaLocation::Unmanaged(_) => {}
444 }
445
446 let mut replica_ops = vec![];
447 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
448 if let Some(to_name) = to_name {
449 replica_ops.push(catalog::Op::RenameClusterReplica {
450 cluster_id: cluster_id.clone(),
451 replica_id: replica.replica_id.to_owned(),
452 name: QualifiedReplica {
453 cluster: cluster_ident,
454 replica: replica_ident,
455 },
456 to_name: to_name.to_owned(),
457 });
458 }
459 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
460 cluster_id,
461 replica_id: replica.replica_id.to_owned(),
462 config: new_replica_config,
463 });
464 replica_ops
465 })
466 .flatten()
467 .collect();
468
469 ops.extend(finalize_replicas);
470
471 ops.push(Op::UpdateClusterConfig {
473 id: cluster_id,
474 name: cluster_name,
475 config: ClusterConfig {
476 variant: ClusterVariant::Managed(new_config),
477 workload_class: workload_class.clone(),
478 },
479 });
480 self.catalog_transact(Some(session), ops).await?;
481 self.active_conns
484 .get_mut(session.conn_id())
485 .expect("There must be an active connection")
486 .pending_cluster_alters
487 .remove(&cluster_id);
488
489 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
490 ObjectType::Cluster,
491 )))
492 }
493
494 async fn check_if_pending_replicas_hydrated_stage(
495 &mut self,
496 session: &Session,
497 plan: AlterClusterPlan,
498 new_config: ClusterVariantManaged,
499 workload_class: Option<String>,
500 timeout_time: Instant,
501 on_timeout: OnTimeoutAction,
502 validity: PlanValidity,
503 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
504 let cluster = self.catalog.get_cluster(plan.id);
506 let pending_replicas = cluster
507 .replicas()
508 .filter_map(|r| {
509 if r.config.location.pending() {
510 Some(r.replica_id.clone())
511 } else {
512 None
513 }
514 })
515 .collect_vec();
516 if Instant::now() > timeout_time {
518 match on_timeout {
520 OnTimeoutAction::Rollback => {
521 self.active_conns
522 .get_mut(session.conn_id())
523 .expect("There must be an active connection")
524 .pending_cluster_alters
525 .remove(&cluster.id);
526 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
527 .await?;
528 return Err(AdapterError::AlterClusterTimeout);
529 }
530 OnTimeoutAction::Commit => {
531 let span = Span::current();
532 let poll_duration = self
533 .catalog
534 .system_config()
535 .cluster_alter_check_ready_interval()
536 .clone();
537 return Ok(StageResult::Handle(mz_ore::task::spawn(
538 || "Finalize Alter Cluster",
539 async move {
540 tokio::time::sleep(poll_duration).await;
541 let stage = ClusterStage::Finalize(AlterClusterFinalize {
542 validity,
543 plan,
544 new_config,
545 workload_class,
546 });
547 Ok(Box::new(stage))
548 }
549 .instrument(span),
550 )));
551 }
552 }
553 }
554 let compute_hydrated_fut = self
555 .controller
556 .compute
557 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
558 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
559
560 let storage_hydrated = self
561 .controller
562 .storage
563 .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
564 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
565
566 let span = Span::current();
567 Ok(StageResult::Handle(mz_ore::task::spawn(
568 || "Alter Cluster: wait for hydrated",
569 async move {
570 let compute_hydrated = compute_hydrated_fut
571 .await
572 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
573
574 if compute_hydrated && storage_hydrated {
575 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
577 validity,
578 plan,
579 new_config: new_config.clone(),
580 workload_class: workload_class.clone(),
581 })))
582 } else {
583 tokio::time::sleep(Duration::from_secs(1)).await;
585 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
586 validity,
587 plan,
588 new_config,
589 workload_class,
590 timeout_time,
591 on_timeout,
592 });
593 Ok(Box::new(stage))
594 }
595 }
596 .instrument(span),
597 )))
598 }
599
600 #[mz_ore::instrument(level = "debug")]
601 pub(crate) async fn sequence_create_cluster(
602 &mut self,
603 session: &Session,
604 CreateClusterPlan {
605 name,
606 variant,
607 workload_class,
608 }: CreateClusterPlan,
609 ) -> Result<ExecuteResponse, AdapterError> {
610 tracing::debug!("sequence_create_cluster");
611
612 let id_ts = self.get_catalog_write_ts().await;
613 let id = self.catalog().allocate_user_cluster_id(id_ts).await?;
614 let introspection_sources = BUILTINS::logs().collect();
619 let cluster_variant = match &variant {
620 CreateClusterVariant::Managed(plan) => {
621 let logging = if let Some(config) = plan.compute.introspection {
622 ReplicaLogging {
623 log_logging: config.debugging,
624 interval: Some(config.interval),
625 }
626 } else {
627 ReplicaLogging::default()
628 };
629 ClusterVariant::Managed(ClusterVariantManaged {
630 size: plan.size.clone(),
631 availability_zones: plan.availability_zones.clone(),
632 logging,
633 replication_factor: plan.replication_factor,
634 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
635 schedule: plan.schedule.clone(),
636 auto_scaling_strategy: None,
637 reconfiguration: None,
638 burst: None,
639 })
640 }
641 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
642 };
643 let config = ClusterConfig {
644 variant: cluster_variant,
645 workload_class,
646 };
647 let ops = vec![catalog::Op::CreateCluster {
648 id,
649 name: name.clone(),
650 introspection_sources,
651 owner_id: *session.current_role_id(),
652 config,
653 }];
654
655 match variant {
656 CreateClusterVariant::Managed(plan) => {
657 self.sequence_create_managed_cluster(session, plan, id, ops)
658 .await
659 }
660 CreateClusterVariant::Unmanaged(plan) => {
661 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
662 .await
663 }
664 }
665 }
666
667 #[mz_ore::instrument(level = "debug")]
668 async fn sequence_create_managed_cluster(
669 &mut self,
670 session: &Session,
671 CreateClusterManagedPlan {
672 availability_zones,
673 compute,
674 replication_factor,
675 size,
676 optimizer_feature_overrides: _,
677 schedule: _,
678 }: CreateClusterManagedPlan,
679 cluster_id: ClusterId,
680 mut ops: Vec<catalog::Op>,
681 ) -> Result<ExecuteResponse, AdapterError> {
682 tracing::debug!("sequence_create_managed_cluster");
683
684 self.ensure_valid_azs(availability_zones.iter())?;
685
686 let role_id = session.role_metadata().current_role;
687 self.catalog.ensure_valid_replica_size(
688 &self
689 .catalog()
690 .get_role_allowed_cluster_sizes(&Some(role_id)),
691 &size,
692 false,
693 )?;
694
695 if cluster_id.is_user() {
700 self.validate_resource_limit(
701 0,
702 i64::from(replication_factor),
703 SystemVars::max_replicas_per_cluster,
704 "cluster replica",
705 MAX_REPLICAS_PER_CLUSTER.name(),
706 )?;
707 }
708
709 let id_ts = self.get_catalog_write_ts().await;
714 let replica_ids = self
715 .catalog()
716 .allocate_replica_ids(cluster_id, u64::from(replication_factor), id_ts)
717 .await?;
718
719 for (replica_id, replica_name) in replica_ids
720 .into_iter()
721 .zip_eq((0..replication_factor).map(managed_cluster_replica_name))
722 {
723 self.create_managed_cluster_replica_op(
724 cluster_id,
725 replica_id,
726 replica_name,
727 &compute,
728 &size,
729 &mut ops,
730 if availability_zones.is_empty() {
731 None
732 } else {
733 Some(availability_zones.as_ref())
734 },
735 false,
736 *session.current_role_id(),
737 ReplicaCreateDropReason::Manual,
738 )?;
739 }
740
741 self.catalog_transact(Some(session), ops).await?;
742
743 self.resolve_scoped_for_new_objects(&BTreeSet::from([cluster_id]), &BTreeSet::new())
754 .await;
755
756 Ok(ExecuteResponse::CreatedCluster)
757 }
758
759 fn create_managed_cluster_replica_op(
760 &self,
761 cluster_id: ClusterId,
762 replica_id: ReplicaId,
763 name: String,
764 compute: &mz_sql::plan::ComputeReplicaConfig,
765 size: &String,
766 ops: &mut Vec<Op>,
767 azs: Option<&[String]>,
768 pending: bool,
769 owner_id: RoleId,
770 reason: ReplicaCreateDropReason,
771 ) -> Result<(), AdapterError> {
772 let location = mz_catalog::durable::ReplicaLocation::Managed {
773 availability_zones: Vec::new(),
776 billed_as: None,
777 internal: false,
778 size: size.clone(),
779 pending,
780 };
781
782 let logging = if let Some(config) = compute.introspection {
783 ReplicaLogging {
784 log_logging: config.debugging,
785 interval: Some(config.interval),
786 }
787 } else {
788 ReplicaLogging::default()
789 };
790
791 let config = ReplicaConfig {
792 location: self.catalog().concretize_replica_location(
793 location,
794 &self
795 .catalog()
796 .get_role_allowed_cluster_sizes(&Some(owner_id)),
797 azs,
798 false,
799 )?,
800 compute: ComputeReplicaConfig { logging },
801 };
802
803 ops.push(catalog::Op::CreateClusterReplica {
807 cluster_id,
808 replica_id,
809 name,
810 config,
811 owner_id,
812 reason,
813 });
814 Ok(())
815 }
816
817 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
818 &self,
819 azs: I,
820 ) -> Result<(), AdapterError> {
821 let cat_azs = self.catalog().state().availability_zones();
822 for az in azs.into_iter() {
823 if !cat_azs.contains(az) {
824 return Err(AdapterError::InvalidClusterReplicaAz {
825 az: az.to_string(),
826 expected: cat_azs.to_vec(),
827 });
828 }
829 }
830 Ok(())
831 }
832
833 #[mz_ore::instrument(level = "debug")]
834 async fn sequence_create_unmanaged_cluster(
835 &mut self,
836 session: &Session,
837 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
838 id: ClusterId,
839 mut ops: Vec<catalog::Op>,
840 ) -> Result<ExecuteResponse, AdapterError> {
841 tracing::debug!("sequence_create_unmanaged_cluster");
842
843 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
844 if let mz_sql::plan::ReplicaConfig::Orchestrated {
845 availability_zone: Some(az),
846 ..
847 } = &r
848 {
849 Some(az)
850 } else {
851 None
852 }
853 }))?;
854
855 if id.is_user() {
860 self.validate_resource_limit(
861 0,
862 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
863 SystemVars::max_replicas_per_cluster,
864 "cluster replica",
865 MAX_REPLICAS_PER_CLUSTER.name(),
866 )?;
867 }
868
869 let id_ts = self.get_catalog_write_ts().await;
874 let replica_ids = self
875 .catalog()
876 .allocate_replica_ids(id, u64::cast_from(replicas.len()), id_ts)
877 .await?;
878
879 for (replica_id, (replica_name, replica_config)) in replica_ids.into_iter().zip_eq(replicas)
880 {
881 let (compute, location) = match replica_config {
884 mz_sql::plan::ReplicaConfig::Unorchestrated {
885 storagectl_addrs,
886 computectl_addrs,
887 compute,
888 } => {
889 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
890 storagectl_addrs,
891 computectl_addrs,
892 };
893 (compute, location)
894 }
895 mz_sql::plan::ReplicaConfig::Orchestrated {
896 availability_zone,
897 billed_as,
898 compute,
899 internal,
900 size,
901 } => {
902 if !session.user().is_internal() && (internal || billed_as.is_some()) {
904 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
905 }
906 if billed_as.is_some() && !internal {
908 coord_bail!("must specify INTERNAL when specifying BILLED AS");
909 }
910
911 let location = mz_catalog::durable::ReplicaLocation::Managed {
912 availability_zones: availability_zone.into_iter().collect(),
915 billed_as,
916 internal,
917 size: size.clone(),
918 pending: false,
919 };
920 (compute, location)
921 }
922 };
923
924 let logging = if let Some(config) = compute.introspection {
925 ReplicaLogging {
926 log_logging: config.debugging,
927 interval: Some(config.interval),
928 }
929 } else {
930 ReplicaLogging::default()
931 };
932
933 let role_id = session.role_metadata().current_role;
934 let config = ReplicaConfig {
935 location: self.catalog().concretize_replica_location(
936 location,
937 &self
938 .catalog()
939 .get_role_allowed_cluster_sizes(&Some(role_id)),
940 None,
941 false,
942 )?,
943 compute: ComputeReplicaConfig { logging },
944 };
945
946 ops.push(catalog::Op::CreateClusterReplica {
947 cluster_id: id,
948 replica_id,
949 name: replica_name.clone(),
950 config,
951 owner_id: *session.current_role_id(),
952 reason: ReplicaCreateDropReason::Manual,
953 });
954 }
955
956 self.catalog_transact(Some(session), ops).await?;
957
958 self.resolve_scoped_for_new_objects(&BTreeSet::from([id]), &BTreeSet::new())
961 .await;
962
963 Ok(ExecuteResponse::CreatedCluster)
964 }
965
966 #[mz_ore::instrument(level = "debug")]
967 pub(crate) async fn sequence_create_cluster_replica(
968 &mut self,
969 session: &Session,
970 CreateClusterReplicaPlan {
971 name,
972 cluster_id,
973 config,
974 }: CreateClusterReplicaPlan,
975 ) -> Result<ExecuteResponse, AdapterError> {
976 let (compute, location) = match config {
978 mz_sql::plan::ReplicaConfig::Unorchestrated {
979 storagectl_addrs,
980 computectl_addrs,
981 compute,
982 } => {
983 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
984 storagectl_addrs,
985 computectl_addrs,
986 };
987 (compute, location)
988 }
989 mz_sql::plan::ReplicaConfig::Orchestrated {
990 availability_zone,
991 billed_as,
992 compute,
993 internal,
994 size,
995 } => {
996 let availability_zone = match availability_zone {
997 Some(az) => {
998 self.ensure_valid_azs([&az])?;
999 Some(az)
1000 }
1001 None => None,
1002 };
1003 let location = mz_catalog::durable::ReplicaLocation::Managed {
1004 availability_zones: availability_zone.into_iter().collect(),
1007 billed_as,
1008 internal,
1009 size,
1010 pending: false,
1011 };
1012 (compute, location)
1013 }
1014 };
1015
1016 let logging = if let Some(config) = compute.introspection {
1017 ReplicaLogging {
1018 log_logging: config.debugging,
1019 interval: Some(config.interval),
1020 }
1021 } else {
1022 ReplicaLogging::default()
1023 };
1024
1025 let role_id = session.role_metadata().current_role;
1026 let config = ReplicaConfig {
1027 location: self.catalog().concretize_replica_location(
1028 location,
1029 &self
1030 .catalog()
1031 .get_role_allowed_cluster_sizes(&Some(role_id)),
1032 None,
1035 false,
1036 )?,
1037 compute: ComputeReplicaConfig { logging },
1038 };
1039
1040 let cluster = self.catalog().get_cluster(cluster_id);
1041
1042 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1043 internal,
1044 billed_as,
1045 ..
1046 }) = &config.location
1047 {
1048 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1050 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1051 }
1052 if cluster.is_managed() && !*internal {
1054 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1055 }
1056 if billed_as.is_some() && !*internal {
1058 coord_bail!("must specify INTERNAL when specifying BILLED AS");
1059 }
1060 }
1061
1062 let owner_id = cluster.owner_id();
1064
1065 let id_ts = self.get_catalog_write_ts().await;
1071 let replica_id = self
1072 .catalog()
1073 .allocate_replica_ids(cluster_id, 1, id_ts)
1074 .await?
1075 .into_element();
1076
1077 let op = catalog::Op::CreateClusterReplica {
1078 cluster_id,
1079 replica_id,
1080 name: name.clone(),
1081 config,
1082 owner_id,
1083 reason: ReplicaCreateDropReason::Manual,
1084 };
1085
1086 self.catalog_transact(Some(session), vec![op]).await?;
1087
1088 Ok(ExecuteResponse::CreatedClusterReplica)
1089 }
1090
1091 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1100 &mut self,
1101 session: Option<&Session>,
1102 cluster_id: ClusterId,
1103 new_config: ClusterConfig,
1104 reason: ReplicaCreateDropReason,
1105 strategy: AlterClusterPlanStrategy,
1106 ) -> Result<NeedsFinalization, AdapterError> {
1107 let cluster = self.catalog.get_cluster(cluster_id);
1108 let name = cluster.name().to_string();
1109 let owner_id = cluster.owner_id();
1110
1111 let mut ops = vec![];
1112 let mut finalization_needed = NeedsFinalization::No;
1113
1114 let ClusterVariant::Managed(ClusterVariantManaged {
1115 size,
1116 availability_zones,
1117 logging,
1118 replication_factor,
1119 optimizer_feature_overrides: _,
1120 schedule: _,
1121 auto_scaling_strategy: _,
1122 reconfiguration: _,
1123 burst: _,
1124 }) = &cluster.config.variant
1125 else {
1126 panic!("expected existing managed cluster config");
1127 };
1128 let size = size.clone();
1132 let availability_zones = availability_zones.clone();
1133 let logging = logging.clone();
1134 let replication_factor = *replication_factor;
1135 let ClusterVariant::Managed(ClusterVariantManaged {
1136 size: new_size,
1137 replication_factor: new_replication_factor,
1138 availability_zones: new_availability_zones,
1139 logging: new_logging,
1140 optimizer_feature_overrides: _,
1141 schedule: _,
1142 auto_scaling_strategy: _,
1143 reconfiguration: _,
1144 burst: _,
1145 }) = &new_config.variant
1146 else {
1147 panic!("expected new managed cluster config");
1148 };
1149
1150 let role_id = session.map(|s| s.role_metadata().current_role);
1151 self.catalog.ensure_valid_replica_size(
1152 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1153 new_size,
1154 false,
1155 )?;
1156
1157 if cluster.replicas().any(|r| r.config.location.pending()) {
1159 return Err(AlterClusterWhilePendingReplicas);
1160 }
1161
1162 let replica_id_by_name: BTreeMap<String, ReplicaId> = cluster
1165 .replicas()
1166 .map(|r| (r.name.clone(), r.replica_id))
1167 .collect();
1168
1169 let compute = mz_sql::plan::ComputeReplicaConfig {
1170 introspection: new_logging
1171 .interval
1172 .map(|interval| ComputeReplicaIntrospectionConfig {
1173 debugging: new_logging.log_logging,
1174 interval,
1175 }),
1176 };
1177
1178 if *new_replication_factor > replication_factor {
1183 if cluster_id.is_user() {
1184 self.validate_resource_limit(
1185 usize::cast_from(replication_factor),
1186 i64::from(*new_replication_factor) - i64::from(replication_factor),
1187 SystemVars::max_replicas_per_cluster,
1188 "cluster replica",
1189 MAX_REPLICAS_PER_CLUSTER.name(),
1190 )?;
1191 }
1192 }
1193
1194 let config_changed = new_size != &size
1198 || new_availability_zones != &availability_zones
1199 || new_logging != &logging;
1200 let needed_replica_ids = if config_changed {
1201 *new_replication_factor
1202 } else if *new_replication_factor > replication_factor {
1203 *new_replication_factor - replication_factor
1204 } else {
1205 0
1206 };
1207 let mut new_replica_ids = if needed_replica_ids > 0 {
1217 let id_ts = self.get_catalog_write_ts().await;
1218 self.catalog()
1219 .allocate_replica_ids(cluster_id, u64::from(needed_replica_ids), id_ts)
1220 .await?
1221 .into_iter()
1222 } else {
1223 Vec::<ReplicaId>::new().into_iter()
1224 };
1225
1226 if config_changed {
1227 self.ensure_valid_azs(new_availability_zones.iter())?;
1228 match strategy {
1232 AlterClusterPlanStrategy::None => {
1233 let replica_ids_and_reasons = (0..replication_factor)
1234 .map(managed_cluster_replica_name)
1235 .filter_map(|name| replica_id_by_name.get(&name).copied())
1236 .map(|replica_id| {
1237 catalog::DropObjectInfo::ClusterReplica((
1238 cluster_id,
1239 replica_id,
1240 reason.clone(),
1241 ))
1242 })
1243 .collect();
1244 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1245 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1246 let replica_id = new_replica_ids
1247 .next()
1248 .expect("pre-allocated enough replica ids");
1249 self.create_managed_cluster_replica_op(
1250 cluster_id,
1251 replica_id,
1252 name.clone(),
1253 &compute,
1254 new_size,
1255 &mut ops,
1256 Some(new_availability_zones.as_ref()),
1257 false,
1258 owner_id,
1259 reason.clone(),
1260 )?;
1261 }
1262 }
1263 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1264 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1265 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1266 let replica_id = new_replica_ids
1267 .next()
1268 .expect("pre-allocated enough replica ids");
1269 self.create_managed_cluster_replica_op(
1270 cluster_id,
1271 replica_id,
1272 name.clone(),
1273 &compute,
1274 new_size,
1275 &mut ops,
1276 Some(new_availability_zones.as_ref()),
1277 true,
1278 owner_id,
1279 reason.clone(),
1280 )?;
1281 }
1282 finalization_needed = NeedsFinalization::Yes;
1283 }
1284 }
1285 } else if *new_replication_factor < replication_factor {
1286 let replica_ids = (*new_replication_factor..replication_factor)
1288 .map(managed_cluster_replica_name)
1289 .filter_map(|name| replica_id_by_name.get(&name).copied())
1290 .map(|replica_id| {
1291 catalog::DropObjectInfo::ClusterReplica((
1292 cluster_id,
1293 replica_id,
1294 reason.clone(),
1295 ))
1296 })
1297 .collect();
1298 ops.push(catalog::Op::DropObjects(replica_ids));
1299 } else if *new_replication_factor > replication_factor {
1300 for name in
1302 (replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1303 {
1304 let replica_id = new_replica_ids
1305 .next()
1306 .expect("pre-allocated enough replica ids");
1307 self.create_managed_cluster_replica_op(
1308 cluster_id,
1309 replica_id,
1310 name.clone(),
1311 &compute,
1312 new_size,
1313 &mut ops,
1314 Some(new_availability_zones.as_ref()),
1317 false,
1318 owner_id,
1319 reason.clone(),
1320 )?;
1321 }
1322 }
1323
1324 match finalization_needed {
1327 NeedsFinalization::No => {
1328 ops.push(catalog::Op::UpdateClusterConfig {
1329 id: cluster_id,
1330 name: name.clone(),
1331 config: new_config,
1332 });
1333 }
1334 NeedsFinalization::Yes => {}
1335 }
1336 self.catalog_transact(session, ops).await?;
1337 Ok(finalization_needed)
1338 }
1339
1340 async fn sequence_alter_cluster_unmanaged_to_managed(
1344 &mut self,
1345 session: &Session,
1346 cluster_id: ClusterId,
1347 mut new_config: ClusterConfig,
1348 options: PlanClusterOption,
1349 ) -> Result<(), AdapterError> {
1350 let cluster = self.catalog.get_cluster(cluster_id);
1351 let cluster_name = cluster.name().to_string();
1352
1353 let ClusterVariant::Managed(ClusterVariantManaged {
1354 size: new_size,
1355 replication_factor: new_replication_factor,
1356 availability_zones: new_availability_zones,
1357 logging: _,
1358 optimizer_feature_overrides: _,
1359 schedule: _,
1360 auto_scaling_strategy: _,
1361 reconfiguration: _,
1362 burst: _,
1363 }) = &mut new_config.variant
1364 else {
1365 panic!("expected new managed cluster config");
1366 };
1367
1368 let user_replica_count = cluster
1370 .user_replicas()
1371 .count()
1372 .try_into()
1373 .expect("must_fit");
1374 match options.replication_factor {
1375 AlterOptionParameter::Set(_) => {
1376 if user_replica_count != *new_replication_factor {
1378 coord_bail!(
1379 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1380 );
1381 }
1382 }
1383 _ => {
1384 *new_replication_factor = user_replica_count;
1385 }
1386 }
1387
1388 let mut names = BTreeSet::new();
1389 let mut sizes = BTreeSet::new();
1390
1391 self.ensure_valid_azs(new_availability_zones.iter())?;
1392
1393 for replica in cluster.user_replicas() {
1395 names.insert(replica.name.clone());
1396 match &replica.config.location {
1397 ReplicaLocation::Unmanaged(_) => coord_bail!(
1398 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1399 ),
1400 ReplicaLocation::Managed(location) => {
1401 sizes.insert(location.size.clone());
1402
1403 for az in &location.availability_zones {
1407 if !new_availability_zones.contains(az) {
1408 coord_bail!(
1409 "unmanaged replica has availability zone {az} which is not \
1410 in managed {new_availability_zones:?}"
1411 )
1412 }
1413 }
1414 }
1415 }
1416 }
1417
1418 if sizes.is_empty() {
1419 assert!(
1420 cluster.user_replicas().next().is_none(),
1421 "Cluster should not have replicas"
1422 );
1423 match &options.size {
1425 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1426 coord_bail!("Missing SIZE for empty cluster")
1427 }
1428 AlterOptionParameter::Set(_) => {} }
1430 } else if sizes.len() == 1 {
1431 let size = sizes.into_iter().next().expect("must exist");
1432 match &options.size {
1433 AlterOptionParameter::Set(sz) if *sz != size => {
1434 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1435 }
1436 _ => *new_size = size,
1437 }
1438 } else {
1439 let formatted = sizes
1440 .iter()
1441 .map(String::as_str)
1442 .collect::<Vec<_>>()
1443 .join(", ");
1444 coord_bail!(
1445 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1446 );
1447 }
1448
1449 for i in 0..*new_replication_factor {
1450 let name = managed_cluster_replica_name(i);
1451 names.remove(&name);
1452 }
1453 if !names.is_empty() {
1454 let formatted = names
1455 .iter()
1456 .map(String::as_str)
1457 .collect::<Vec<_>>()
1458 .join(", ");
1459 coord_bail!(
1460 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1461 );
1462 }
1463
1464 let ops = vec![catalog::Op::UpdateClusterConfig {
1465 id: cluster_id,
1466 name: cluster_name,
1467 config: new_config,
1468 }];
1469
1470 self.catalog_transact(Some(session), ops).await?;
1471 Ok(())
1472 }
1473
1474 async fn sequence_alter_cluster_managed_to_unmanaged(
1475 &mut self,
1476 session: &Session,
1477 cluster_id: ClusterId,
1478 new_config: ClusterConfig,
1479 ) -> Result<(), AdapterError> {
1480 let cluster = self.catalog().get_cluster(cluster_id);
1481
1482 let ops = vec![catalog::Op::UpdateClusterConfig {
1483 id: cluster_id,
1484 name: cluster.name().to_string(),
1485 config: new_config,
1486 }];
1487
1488 self.catalog_transact(Some(session), ops).await?;
1489 Ok(())
1490 }
1491
1492 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1493 &mut self,
1494 session: &Session,
1495 cluster_id: ClusterId,
1496 new_config: ClusterConfig,
1497 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1498 ) -> Result<(), AdapterError> {
1499 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1500 coord_bail!("Cannot alter replicas in unmanaged cluster");
1501 }
1502
1503 let cluster = self.catalog().get_cluster(cluster_id);
1504
1505 let ops = vec![catalog::Op::UpdateClusterConfig {
1506 id: cluster_id,
1507 name: cluster.name().to_string(),
1508 config: new_config,
1509 }];
1510
1511 self.catalog_transact(Some(session), ops).await?;
1512 Ok(())
1513 }
1514
1515 pub(crate) async fn sequence_alter_cluster_rename(
1516 &mut self,
1517 ctx: &mut ExecuteContext,
1518 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1519 ) -> Result<ExecuteResponse, AdapterError> {
1520 let op = Op::RenameCluster {
1521 id,
1522 name,
1523 to_name,
1524 check_reserved_names: true,
1525 };
1526 match self
1527 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1528 .await
1529 {
1530 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1531 Err(err) => Err(err),
1532 }
1533 }
1534
1535 pub(crate) async fn sequence_alter_cluster_swap(
1536 &mut self,
1537 ctx: &mut ExecuteContext,
1538 AlterClusterSwapPlan {
1539 id_a,
1540 id_b,
1541 name_a,
1542 name_b,
1543 name_temp,
1544 }: AlterClusterSwapPlan,
1545 ) -> Result<ExecuteResponse, AdapterError> {
1546 let op_a = Op::RenameCluster {
1547 id: id_a,
1548 name: name_a.clone(),
1549 to_name: name_temp.clone(),
1550 check_reserved_names: false,
1551 };
1552 let op_b = Op::RenameCluster {
1553 id: id_b,
1554 name: name_b.clone(),
1555 to_name: name_a,
1556 check_reserved_names: false,
1557 };
1558 let op_temp = Op::RenameCluster {
1559 id: id_a,
1560 name: name_temp,
1561 to_name: name_b,
1562 check_reserved_names: false,
1563 };
1564
1565 match self
1566 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1567 Box::pin(async {})
1568 })
1569 .await
1570 {
1571 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1572 Err(err) => Err(err),
1573 }
1574 }
1575
1576 pub(crate) async fn sequence_alter_cluster_replica_rename(
1577 &mut self,
1578 session: &Session,
1579 AlterClusterReplicaRenamePlan {
1580 cluster_id,
1581 replica_id,
1582 name,
1583 to_name,
1584 }: AlterClusterReplicaRenamePlan,
1585 ) -> Result<ExecuteResponse, AdapterError> {
1586 let op = catalog::Op::RenameClusterReplica {
1587 cluster_id,
1588 replica_id,
1589 name,
1590 to_name,
1591 };
1592 match self.catalog_transact(Some(session), vec![op]).await {
1593 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1594 Err(err) => Err(err),
1595 }
1596 }
1597
1598 pub(crate) async fn sequence_alter_set_cluster(
1600 &self,
1601 _session: &Session,
1602 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1603 ) -> Result<ExecuteResponse, AdapterError> {
1604 async {}.await;
1608 let entry = self.catalog().get_entry(&id);
1609 match entry.item().typ() {
1610 _ => {
1611 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1613 }
1614 }
1615 }
1616}
1617
1618fn managed_cluster_replica_name(index: u32) -> String {
1619 format!("r{}", index + 1)
1620}
1621
1622#[derive(PartialEq)]
1625pub(crate) enum NeedsFinalization {
1626 Yes,
1628 No,
1629}