1use std::collections::BTreeSet;
11use std::time::{Duration, Instant};
12
13use itertools::Itertools;
14use maplit::btreeset;
15use mz_adapter_types::compaction::CompactionWindow;
16use mz_catalog::builtin::BUILTINS;
17use mz_catalog::memory::objects::{
18 ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged,
19};
20use mz_compute_types::config::ComputeReplicaConfig;
21use mz_controller::clusters::{
22 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaConfig, ReplicaLocation,
23 ReplicaLogging,
24};
25use mz_controller_types::{ClusterId, DEFAULT_REPLICA_LOGGING_INTERVAL, ReplicaId};
26use mz_ore::cast::CastFrom;
27use mz_ore::instrument;
28use mz_repr::role_id::RoleId;
29use mz_sql::ast::{Ident, QualifiedReplica};
30use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, ObjectType};
31use mz_sql::plan::{
32 self, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan,
33 AlterClusterSwapPlan, AlterOptionParameter, AlterSetClusterPlan,
34 ComputeReplicaIntrospectionConfig, CreateClusterManagedPlan, CreateClusterPlan,
35 CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, PlanClusterOption,
36};
37use mz_sql::plan::{AlterClusterPlan, OnTimeoutAction};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::vars::{MAX_REPLICAS_PER_CLUSTER, SystemVars, Var};
40use tracing::{Instrument, Span, debug};
41
42use super::return_if_err;
43use crate::catalog::{self, Op, ReplicaCreateDropReason};
44use crate::coord::{
45 AlterCluster, AlterClusterFinalize, AlterClusterWaitForHydrated, ClusterStage, Coordinator,
46 Message, PlanValidity, StageResult, Staged,
47};
48use crate::{AdapterError, ExecuteContext, ExecuteResponse, session::Session};
49
50const PENDING_REPLICA_SUFFIX: &str = "-pending";
51
52impl Staged for ClusterStage {
53 type Ctx = ExecuteContext;
54
55 fn validity(&mut self) -> &mut PlanValidity {
56 match self {
57 Self::Alter(stage) => &mut stage.validity,
58 Self::WaitForHydrated(stage) => &mut stage.validity,
59 Self::Finalize(stage) => &mut stage.validity,
60 }
61 }
62
63 async fn stage(
64 self,
65 coord: &mut Coordinator,
66 ctx: &mut ExecuteContext,
67 ) -> Result<StageResult<Box<Self>>, crate::AdapterError> {
68 match self {
69 Self::Alter(stage) => {
70 coord
71 .sequence_alter_cluster_stage(ctx.session(), stage.plan.clone(), stage.validity)
72 .await
73 }
74 Self::WaitForHydrated(stage) => {
75 let AlterClusterWaitForHydrated {
76 validity,
77 plan,
78 new_config,
79 timeout_time,
80 on_timeout,
81 } = stage;
82 coord
83 .check_if_pending_replicas_hydrated_stage(
84 ctx.session(),
85 plan,
86 new_config,
87 timeout_time,
88 on_timeout,
89 validity,
90 )
91 .await
92 }
93 Self::Finalize(stage) => {
94 coord
95 .finalize_alter_cluster_stage(
96 ctx.session(),
97 stage.plan.clone(),
98 stage.new_config.clone(),
99 )
100 .await
101 }
102 }
103 }
104
105 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
106 Message::ClusterStageReady {
107 ctx,
108 span,
109 stage: self,
110 }
111 }
112
113 fn cancel_enabled(&self) -> bool {
114 true
115 }
116}
117
118impl Coordinator {
119 #[instrument]
120 pub(crate) async fn sequence_alter_cluster_staged(
121 &mut self,
122 ctx: ExecuteContext,
123 plan: plan::AlterClusterPlan,
124 ) {
125 let stage = return_if_err!(self.alter_cluster_validate(ctx.session(), plan).await, ctx);
126 self.sequence_staged(ctx, Span::current(), stage).await;
127 }
128
129 #[instrument]
130 async fn alter_cluster_validate(
131 &mut self,
132 session: &Session,
133 plan: plan::AlterClusterPlan,
134 ) -> Result<ClusterStage, AdapterError> {
135 let validity = PlanValidity::new(
136 self.catalog().transient_revision(),
137 BTreeSet::new(),
138 Some(plan.id.clone()),
139 None,
140 session.role_metadata().clone(),
141 );
142 Ok(ClusterStage::Alter(AlterCluster { validity, plan }))
143 }
144
145 async fn sequence_alter_cluster_stage(
146 &mut self,
147 session: &Session,
148 plan: plan::AlterClusterPlan,
149 validity: PlanValidity,
150 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
151 let AlterClusterPlan {
152 id: cluster_id,
153 name: _,
154 ref options,
155 ref strategy,
156 } = plan;
157
158 use mz_catalog::memory::objects::ClusterVariant::*;
159 use mz_sql::plan::AlterOptionParameter::*;
160 let cluster = self.catalog.get_cluster(cluster_id);
161 let config = cluster.config.clone();
162 let mut new_config = config.clone();
163
164 match (&new_config.variant, &options.managed) {
165 (Managed(_), Reset) | (Managed(_), Unchanged) | (Managed(_), Set(true)) => {}
166 (Managed(_), Set(false)) => new_config.variant = Unmanaged,
167 (Unmanaged, Unchanged) | (Unmanaged, Set(false)) => {}
168 (Unmanaged, Reset) | (Unmanaged, Set(true)) => {
169 let size = "".to_string();
173 let logging = ReplicaLogging {
174 log_logging: false,
175 interval: Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
176 };
177 new_config.variant = Managed(ClusterVariantManaged {
178 size,
179 availability_zones: Default::default(),
180 logging,
181 replication_factor: 1,
182 optimizer_feature_overrides: Default::default(),
183 schedule: Default::default(),
184 });
185 }
186 }
187
188 match &mut new_config.variant {
189 Managed(ClusterVariantManaged {
190 size,
191 availability_zones,
192 logging,
193 replication_factor,
194 optimizer_feature_overrides: _,
195 schedule,
196 }) => {
197 match &options.size {
198 Set(s) => size.clone_from(s),
199 Reset => coord_bail!("SIZE has no default value"),
200 Unchanged => {}
201 }
202 match &options.availability_zones {
203 Set(az) => availability_zones.clone_from(az),
204 Reset => *availability_zones = Default::default(),
205 Unchanged => {}
206 }
207 match &options.introspection_debugging {
208 Set(id) => logging.log_logging = *id,
209 Reset => logging.log_logging = false,
210 Unchanged => {}
211 }
212 match &options.introspection_interval {
213 Set(ii) => logging.interval = ii.0,
214 Reset => logging.interval = Some(DEFAULT_REPLICA_LOGGING_INTERVAL),
215 Unchanged => {}
216 }
217 match &options.replication_factor {
218 Set(rf) => *replication_factor = *rf,
219 Reset => {
220 *replication_factor = self
221 .catalog
222 .system_config()
223 .default_cluster_replication_factor()
224 }
225 Unchanged => {}
226 }
227 match &options.schedule {
228 Set(new_schedule) => {
229 *schedule = new_schedule.clone();
230 }
231 Reset => *schedule = Default::default(),
232 Unchanged => {}
233 }
234 if !matches!(options.replicas, Unchanged) {
235 coord_bail!("Cannot change REPLICAS of managed clusters");
236 }
237 }
238 Unmanaged => {
239 if !matches!(options.size, Unchanged) {
240 coord_bail!("Cannot change SIZE of unmanaged clusters");
241 }
242 if !matches!(options.availability_zones, Unchanged) {
243 coord_bail!("Cannot change AVAILABILITY ZONES of unmanaged clusters");
244 }
245 if !matches!(options.introspection_debugging, Unchanged) {
246 coord_bail!("Cannot change INTROSPECTION DEGUBBING of unmanaged clusters");
247 }
248 if !matches!(options.introspection_interval, Unchanged) {
249 coord_bail!("Cannot change INTROSPECTION INTERVAL of unmanaged clusters");
250 }
251 if !matches!(options.replication_factor, Unchanged) {
252 coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
253 }
254 }
255 }
256
257 match &options.workload_class {
258 Set(wc) => new_config.workload_class.clone_from(wc),
259 Reset => new_config.workload_class = None,
260 Unchanged => {}
261 }
262
263 if new_config == config {
264 return Ok(StageResult::Response(ExecuteResponse::AlteredObject(
265 ObjectType::Cluster,
266 )));
267 }
268
269 let new_workload_class = new_config.workload_class.clone();
270 match (&config.variant, &new_config.variant) {
271 (Managed(_), Managed(new_config_managed)) => {
272 let alter_followup = self
273 .sequence_alter_cluster_managed_to_managed(
274 Some(session),
275 cluster_id,
276 new_config.clone(),
277 ReplicaCreateDropReason::Manual,
278 strategy.clone(),
279 )
280 .await?;
281 if alter_followup == NeedsFinalization::Yes {
282 self.active_conns
285 .get_mut(session.conn_id())
286 .expect("There must be an active connection")
287 .pending_cluster_alters
288 .insert(cluster_id.clone());
289 let new_config_managed = new_config_managed.clone();
290 return match &strategy {
291 AlterClusterPlanStrategy::None => Err(AdapterError::Internal(
292 "AlterClusterPlanStrategy must not be None if NeedsFinalization is Yes"
293 .into(),
294 )),
295 AlterClusterPlanStrategy::For(duration) => {
296 let span = Span::current();
297 let plan = plan.clone();
298 let duration = duration.clone().to_owned();
299 Ok(StageResult::Handle(mz_ore::task::spawn(
300 || "Finalize Alter Cluster",
301 async move {
302 tokio::time::sleep(duration).await;
303 let stage = ClusterStage::Finalize(AlterClusterFinalize {
304 validity,
305 plan,
306 new_config: new_config_managed,
307 });
308 Ok(Box::new(stage))
309 }
310 .instrument(span),
311 )))
312 }
313 AlterClusterPlanStrategy::UntilReady {
314 timeout,
315 on_timeout,
316 } => Ok(StageResult::Immediate(Box::new(
317 ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
318 validity,
319 plan: plan.clone(),
320 new_config: new_config_managed.clone(),
321 timeout_time: Instant::now() + timeout.to_owned(),
322 on_timeout: on_timeout.to_owned(),
323 }),
324 ))),
325 };
326 }
327 }
328 (Unmanaged, Managed(_)) => {
329 self.sequence_alter_cluster_unmanaged_to_managed(
330 session,
331 cluster_id,
332 new_config,
333 options.to_owned(),
334 )
335 .await?;
336 }
337 (Managed(_), Unmanaged) => {
338 self.sequence_alter_cluster_managed_to_unmanaged(session, cluster_id, new_config)
339 .await?;
340 }
341 (Unmanaged, Unmanaged) => {
342 self.sequence_alter_cluster_unmanaged_to_unmanaged(
343 session,
344 cluster_id,
345 new_config,
346 options.replicas.clone(),
347 )
348 .await?;
349 }
350 }
351
352 self.controller
353 .update_cluster_workload_class(cluster_id, new_workload_class)?;
354
355 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
356 ObjectType::Cluster,
357 )))
358 }
359
360 async fn finalize_alter_cluster_stage(
361 &mut self,
362 session: &Session,
363 AlterClusterPlan {
364 id: cluster_id,
365 name: cluster_name,
366 ..
367 }: AlterClusterPlan,
368 new_config: ClusterVariantManaged,
369 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
370 let cluster = self.catalog.get_cluster(cluster_id);
371 let workload_class = cluster.config.workload_class.clone();
372 let mut ops = vec![];
373
374 let remove_replicas = cluster
377 .replicas()
378 .filter_map(|r| {
379 if !r.config.location.pending() && !r.config.location.internal() {
380 Some(catalog::DropObjectInfo::ClusterReplica((
381 cluster_id.clone(),
382 r.replica_id,
383 ReplicaCreateDropReason::Manual,
384 )))
385 } else {
386 None
387 }
388 })
389 .collect();
390 ops.push(catalog::Op::DropObjects(remove_replicas));
391
392 let finalize_replicas: Vec<catalog::Op> = cluster
395 .replicas()
396 .filter_map(|r| {
397 if r.config.location.pending() {
398 let cluster_ident = match Ident::new(cluster.name.clone()) {
399 Ok(id) => id,
400 Err(err) => {
401 return Some(Err(AdapterError::internal(
402 "Unexpected error parsing cluster name",
403 err,
404 )));
405 }
406 };
407 let replica_ident = match Ident::new(r.name.clone()) {
408 Ok(id) => id,
409 Err(err) => {
410 return Some(Err(AdapterError::internal(
411 "Unexpected error parsing replica name",
412 err,
413 )));
414 }
415 };
416 Some(Ok((cluster_ident, replica_ident, r)))
417 } else {
418 None
419 }
420 })
421 .collect::<Result<Vec<(Ident, Ident, &ClusterReplica)>, _>>()?
424 .into_iter()
425 .map(|(cluster_ident, replica_ident, replica)| {
426 let mut new_replica_config = replica.config.clone();
427 debug!("Promoting replica: {}", replica.name);
428 match new_replica_config.location {
429 mz_controller::clusters::ReplicaLocation::Managed(ManagedReplicaLocation {
430 ref mut pending,
431 ..
432 }) => {
433 *pending = false;
434 }
435 _ => {}
436 }
437
438 let mut replica_ops = vec![];
439 let to_name = replica.name.strip_suffix(PENDING_REPLICA_SUFFIX);
440 if let Some(to_name) = to_name {
441 replica_ops.push(catalog::Op::RenameClusterReplica {
442 cluster_id: cluster_id.clone(),
443 replica_id: replica.replica_id.to_owned(),
444 name: QualifiedReplica {
445 cluster: cluster_ident,
446 replica: replica_ident,
447 },
448 to_name: to_name.to_owned(),
449 });
450 }
451 replica_ops.push(catalog::Op::UpdateClusterReplicaConfig {
452 cluster_id,
453 replica_id: replica.replica_id.to_owned(),
454 config: new_replica_config,
455 });
456 replica_ops
457 })
458 .flatten()
459 .collect();
460
461 ops.extend(finalize_replicas);
462
463 ops.push(Op::UpdateClusterConfig {
465 id: cluster_id,
466 name: cluster_name,
467 config: ClusterConfig {
468 variant: ClusterVariant::Managed(new_config),
469 workload_class: workload_class.clone(),
470 },
471 });
472 self.catalog_transact(Some(session), ops).await?;
473 self.active_conns
476 .get_mut(session.conn_id())
477 .expect("There must be an active connection")
478 .pending_cluster_alters
479 .remove(&cluster_id);
480
481 self.controller
482 .update_cluster_workload_class(cluster_id, workload_class)?;
483
484 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
485 ObjectType::Cluster,
486 )))
487 }
488
489 async fn check_if_pending_replicas_hydrated_stage(
490 &mut self,
491 session: &Session,
492 plan: AlterClusterPlan,
493 new_config: ClusterVariantManaged,
494 timeout_time: Instant,
495 on_timeout: OnTimeoutAction,
496 validity: PlanValidity,
497 ) -> Result<StageResult<Box<ClusterStage>>, AdapterError> {
498 let cluster = self.catalog.get_cluster(plan.id);
500 let pending_replicas = cluster
501 .replicas()
502 .filter_map(|r| {
503 if r.config.location.pending() {
504 Some(r.replica_id.clone())
505 } else {
506 None
507 }
508 })
509 .collect_vec();
510 if Instant::now() > timeout_time {
512 match on_timeout {
514 OnTimeoutAction::Rollback => {
515 self.active_conns
516 .get_mut(session.conn_id())
517 .expect("There must be an active connection")
518 .pending_cluster_alters
519 .remove(&cluster.id);
520 self.drop_reconfiguration_replicas(btreeset!(cluster.id))
521 .await?;
522 return Err(AdapterError::AlterClusterTimeout);
523 }
524 OnTimeoutAction::Commit => {
525 let span = Span::current();
526 let poll_duration = self
527 .catalog
528 .system_config()
529 .cluster_alter_check_ready_interval()
530 .clone();
531 return Ok(StageResult::Handle(mz_ore::task::spawn(
532 || "Finalize Alter Cluster",
533 async move {
534 tokio::time::sleep(poll_duration).await;
535 let stage = ClusterStage::Finalize(AlterClusterFinalize {
536 validity,
537 plan,
538 new_config,
539 });
540 Ok(Box::new(stage))
541 }
542 .instrument(span),
543 )));
544 }
545 }
546 }
547 let compute_hydrated_fut = self
548 .controller
549 .compute
550 .collections_hydrated_for_replicas(cluster.id, pending_replicas.clone(), [].into())
551 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
552
553 let storage_hydrated = self
554 .controller
555 .storage
556 .collections_hydrated_on_replicas(Some(pending_replicas), &cluster.id, &[].into())
557 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
558
559 let span = Span::current();
560 Ok(StageResult::Handle(mz_ore::task::spawn(
561 || "Alter Cluster: wait for hydrated",
562 async move {
563 let compute_hydrated = compute_hydrated_fut
564 .await
565 .map_err(|e| AdapterError::internal("Failed to check hydration", e))?;
566
567 if compute_hydrated && storage_hydrated {
568 Ok(Box::new(ClusterStage::Finalize(AlterClusterFinalize {
570 validity,
571 plan,
572 new_config,
573 })))
574 } else {
575 tokio::time::sleep(Duration::from_secs(1)).await;
577 let stage = ClusterStage::WaitForHydrated(AlterClusterWaitForHydrated {
578 validity,
579 plan,
580 new_config,
581 timeout_time,
582 on_timeout,
583 });
584 Ok(Box::new(stage))
585 }
586 }
587 .instrument(span),
588 )))
589 }
590
591 #[mz_ore::instrument(level = "debug")]
592 pub(crate) async fn sequence_create_cluster(
593 &mut self,
594 session: &Session,
595 CreateClusterPlan {
596 name,
597 variant,
598 workload_class,
599 }: CreateClusterPlan,
600 ) -> Result<ExecuteResponse, AdapterError> {
601 tracing::debug!("sequence_create_cluster");
602
603 let id_ts = self.get_catalog_write_ts().await;
604 let id = self.catalog_mut().allocate_user_cluster_id(id_ts).await?;
605 let introspection_sources = BUILTINS::logs().collect();
610 let cluster_variant = match &variant {
611 CreateClusterVariant::Managed(plan) => {
612 let logging = if let Some(config) = plan.compute.introspection {
613 ReplicaLogging {
614 log_logging: config.debugging,
615 interval: Some(config.interval),
616 }
617 } else {
618 ReplicaLogging::default()
619 };
620 ClusterVariant::Managed(ClusterVariantManaged {
621 size: plan.size.clone(),
622 availability_zones: plan.availability_zones.clone(),
623 logging,
624 replication_factor: plan.replication_factor,
625 optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
626 schedule: plan.schedule.clone(),
627 })
628 }
629 CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
630 };
631 let config = ClusterConfig {
632 variant: cluster_variant,
633 workload_class,
634 };
635 let ops = vec![catalog::Op::CreateCluster {
636 id,
637 name: name.clone(),
638 introspection_sources,
639 owner_id: *session.current_role_id(),
640 config,
641 }];
642
643 match variant {
644 CreateClusterVariant::Managed(plan) => {
645 self.sequence_create_managed_cluster(session, plan, id, ops)
646 .await
647 }
648 CreateClusterVariant::Unmanaged(plan) => {
649 self.sequence_create_unmanaged_cluster(session, plan, id, ops)
650 .await
651 }
652 }
653 }
654
655 #[mz_ore::instrument(level = "debug")]
656 async fn sequence_create_managed_cluster(
657 &mut self,
658 session: &Session,
659 CreateClusterManagedPlan {
660 availability_zones,
661 compute,
662 replication_factor,
663 size,
664 optimizer_feature_overrides: _,
665 schedule: _,
666 }: CreateClusterManagedPlan,
667 cluster_id: ClusterId,
668 mut ops: Vec<catalog::Op>,
669 ) -> Result<ExecuteResponse, AdapterError> {
670 tracing::debug!("sequence_create_managed_cluster");
671
672 self.ensure_valid_azs(availability_zones.iter())?;
673
674 let role_id = session.role_metadata().current_role;
675 self.catalog.ensure_valid_replica_size(
676 &self
677 .catalog()
678 .get_role_allowed_cluster_sizes(&Some(role_id)),
679 &size,
680 )?;
681
682 if cluster_id.is_user() {
687 self.validate_resource_limit(
688 0,
689 i64::from(replication_factor),
690 SystemVars::max_replicas_per_cluster,
691 "cluster replica",
692 MAX_REPLICAS_PER_CLUSTER.name(),
693 )?;
694 }
695
696 for replica_name in (0..replication_factor).map(managed_cluster_replica_name) {
697 self.create_managed_cluster_replica_op(
698 cluster_id,
699 replica_name,
700 &compute,
701 &size,
702 &mut ops,
703 if availability_zones.is_empty() {
704 None
705 } else {
706 Some(availability_zones.as_ref())
707 },
708 false,
709 *session.current_role_id(),
710 ReplicaCreateDropReason::Manual,
711 )?;
712 }
713
714 self.catalog_transact(Some(session), ops).await?;
715
716 self.create_cluster(cluster_id).await;
717
718 Ok(ExecuteResponse::CreatedCluster)
719 }
720
721 fn create_managed_cluster_replica_op(
722 &self,
723 cluster_id: ClusterId,
724 name: String,
725 compute: &mz_sql::plan::ComputeReplicaConfig,
726 size: &String,
727 ops: &mut Vec<Op>,
728 azs: Option<&[String]>,
729 pending: bool,
730 owner_id: RoleId,
731 reason: ReplicaCreateDropReason,
732 ) -> Result<(), AdapterError> {
733 let location = mz_catalog::durable::ReplicaLocation::Managed {
734 availability_zone: None,
735 billed_as: None,
736 internal: false,
737 size: size.clone(),
738 pending,
739 };
740
741 let logging = if let Some(config) = compute.introspection {
742 ReplicaLogging {
743 log_logging: config.debugging,
744 interval: Some(config.interval),
745 }
746 } else {
747 ReplicaLogging::default()
748 };
749
750 let config = ReplicaConfig {
751 location: self.catalog().concretize_replica_location(
752 location,
753 &self
754 .catalog()
755 .get_role_allowed_cluster_sizes(&Some(owner_id)),
756 azs,
757 )?,
758 compute: ComputeReplicaConfig { logging },
759 };
760
761 ops.push(catalog::Op::CreateClusterReplica {
762 cluster_id,
763 name,
764 config,
765 owner_id,
766 reason,
767 });
768 Ok(())
769 }
770
771 fn ensure_valid_azs<'a, I: IntoIterator<Item = &'a String>>(
772 &self,
773 azs: I,
774 ) -> Result<(), AdapterError> {
775 let cat_azs = self.catalog().state().availability_zones();
776 for az in azs.into_iter() {
777 if !cat_azs.contains(az) {
778 return Err(AdapterError::InvalidClusterReplicaAz {
779 az: az.to_string(),
780 expected: cat_azs.to_vec(),
781 });
782 }
783 }
784 Ok(())
785 }
786
787 #[mz_ore::instrument(level = "debug")]
788 async fn sequence_create_unmanaged_cluster(
789 &mut self,
790 session: &Session,
791 CreateClusterUnmanagedPlan { replicas }: CreateClusterUnmanagedPlan,
792 id: ClusterId,
793 mut ops: Vec<catalog::Op>,
794 ) -> Result<ExecuteResponse, AdapterError> {
795 tracing::debug!("sequence_create_unmanaged_cluster");
796
797 self.ensure_valid_azs(replicas.iter().filter_map(|(_, r)| {
798 if let mz_sql::plan::ReplicaConfig::Orchestrated {
799 availability_zone: Some(az),
800 ..
801 } = &r
802 {
803 Some(az)
804 } else {
805 None
806 }
807 }))?;
808
809 if id.is_user() {
814 self.validate_resource_limit(
815 0,
816 i64::try_from(replicas.len()).unwrap_or(i64::MAX),
817 SystemVars::max_replicas_per_cluster,
818 "cluster replica",
819 MAX_REPLICAS_PER_CLUSTER.name(),
820 )?;
821 }
822
823 for (replica_name, replica_config) in replicas {
824 let (compute, location) = match replica_config {
827 mz_sql::plan::ReplicaConfig::Unorchestrated {
828 storagectl_addrs,
829 computectl_addrs,
830 compute,
831 } => {
832 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
833 storagectl_addrs,
834 computectl_addrs,
835 };
836 (compute, location)
837 }
838 mz_sql::plan::ReplicaConfig::Orchestrated {
839 availability_zone,
840 billed_as,
841 compute,
842 internal,
843 size,
844 } => {
845 if !session.user().is_internal() && (internal || billed_as.is_some()) {
847 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
848 }
849 if billed_as.is_some() && !internal {
851 coord_bail!("must specify INTERNAL when specifying BILLED AS");
852 }
853
854 let location = mz_catalog::durable::ReplicaLocation::Managed {
855 availability_zone,
856 billed_as,
857 internal,
858 size: size.clone(),
859 pending: false,
860 };
861 (compute, location)
862 }
863 };
864
865 let logging = if let Some(config) = compute.introspection {
866 ReplicaLogging {
867 log_logging: config.debugging,
868 interval: Some(config.interval),
869 }
870 } else {
871 ReplicaLogging::default()
872 };
873
874 let role_id = session.role_metadata().current_role;
875 let config = ReplicaConfig {
876 location: self.catalog().concretize_replica_location(
877 location,
878 &self
879 .catalog()
880 .get_role_allowed_cluster_sizes(&Some(role_id)),
881 None,
882 )?,
883 compute: ComputeReplicaConfig { logging },
884 };
885
886 ops.push(catalog::Op::CreateClusterReplica {
887 cluster_id: id,
888 name: replica_name.clone(),
889 config,
890 owner_id: *session.current_role_id(),
891 reason: ReplicaCreateDropReason::Manual,
892 });
893 }
894
895 self.catalog_transact(Some(session), ops).await?;
896
897 self.create_cluster(id).await;
898
899 Ok(ExecuteResponse::CreatedCluster)
900 }
901
902 async fn create_cluster(&mut self, cluster_id: ClusterId) {
903 let Coordinator {
904 catalog,
905 controller,
906 ..
907 } = self;
908 let cluster = catalog.get_cluster(cluster_id);
909 let cluster_id = cluster.id;
910 let introspection_source_ids: Vec<_> =
911 cluster.log_indexes.iter().map(|(_, id)| *id).collect();
912
913 controller
914 .create_cluster(
915 cluster_id,
916 mz_controller::clusters::ClusterConfig {
917 arranged_logs: cluster.log_indexes.clone(),
918 workload_class: cluster.config.workload_class.clone(),
919 },
920 )
921 .expect("creating cluster must not fail");
922
923 let replica_ids: Vec<_> = cluster
924 .replicas()
925 .map(|r| (r.replica_id, format!("{}.{}", cluster.name(), &r.name)))
926 .collect();
927 for (replica_id, replica_name) in replica_ids {
928 self.create_cluster_replica(cluster_id, replica_id, replica_name)
929 .await;
930 }
931
932 if !introspection_source_ids.is_empty() {
933 self.initialize_compute_read_policies(
934 introspection_source_ids,
935 cluster_id,
936 CompactionWindow::Default,
937 )
938 .await;
939 }
940 }
941
942 #[mz_ore::instrument(level = "debug")]
943 pub(crate) async fn sequence_create_cluster_replica(
944 &mut self,
945 session: &Session,
946 CreateClusterReplicaPlan {
947 name,
948 cluster_id,
949 config,
950 }: CreateClusterReplicaPlan,
951 ) -> Result<ExecuteResponse, AdapterError> {
952 let (compute, location) = match config {
954 mz_sql::plan::ReplicaConfig::Unorchestrated {
955 storagectl_addrs,
956 computectl_addrs,
957 compute,
958 } => {
959 let location = mz_catalog::durable::ReplicaLocation::Unmanaged {
960 storagectl_addrs,
961 computectl_addrs,
962 };
963 (compute, location)
964 }
965 mz_sql::plan::ReplicaConfig::Orchestrated {
966 availability_zone,
967 billed_as,
968 compute,
969 internal,
970 size,
971 } => {
972 let availability_zone = match availability_zone {
973 Some(az) => {
974 self.ensure_valid_azs([&az])?;
975 Some(az)
976 }
977 None => None,
978 };
979 let location = mz_catalog::durable::ReplicaLocation::Managed {
980 availability_zone,
981 billed_as,
982 internal,
983 size,
984 pending: false,
985 };
986 (compute, location)
987 }
988 };
989
990 let logging = if let Some(config) = compute.introspection {
991 ReplicaLogging {
992 log_logging: config.debugging,
993 interval: Some(config.interval),
994 }
995 } else {
996 ReplicaLogging::default()
997 };
998
999 let role_id = session.role_metadata().current_role;
1000 let config = ReplicaConfig {
1001 location: self.catalog().concretize_replica_location(
1002 location,
1003 &self
1004 .catalog()
1005 .get_role_allowed_cluster_sizes(&Some(role_id)),
1006 None,
1009 )?,
1010 compute: ComputeReplicaConfig { logging },
1011 };
1012
1013 let cluster = self.catalog().get_cluster(cluster_id);
1014
1015 if let ReplicaLocation::Managed(ManagedReplicaLocation {
1016 internal,
1017 billed_as,
1018 ..
1019 }) = &config.location
1020 {
1021 if !session.user().is_internal() && (*internal || billed_as.is_some()) {
1023 coord_bail!("cannot specify INTERNAL or BILLED AS as non-internal user")
1024 }
1025 if cluster.is_managed() && !*internal {
1027 coord_bail!("must specify INTERNAL when creating a replica in a managed cluster");
1028 }
1029 if billed_as.is_some() && !*internal {
1031 coord_bail!("must specify INTERNAL when specifying BILLED AS");
1032 }
1033 }
1034
1035 let owner_id = cluster.owner_id();
1037 let op = catalog::Op::CreateClusterReplica {
1038 cluster_id,
1039 name: name.clone(),
1040 config,
1041 owner_id,
1042 reason: ReplicaCreateDropReason::Manual,
1043 };
1044
1045 self.catalog_transact(Some(session), vec![op]).await?;
1046
1047 let id = self
1048 .catalog()
1049 .resolve_replica_in_cluster(&cluster_id, &name)
1050 .expect("just created")
1051 .replica_id();
1052
1053 self.create_cluster_replica(cluster_id, id, name).await;
1054
1055 Ok(ExecuteResponse::CreatedClusterReplica)
1056 }
1057
1058 async fn create_cluster_replica(
1059 &mut self,
1060 cluster_id: ClusterId,
1061 replica_id: ReplicaId,
1062 replica_name: String,
1063 ) {
1064 let cluster = self.catalog().get_cluster(cluster_id);
1065 let role = cluster.role();
1066 let replica_config = cluster
1067 .replica(replica_id)
1068 .expect("known to exist")
1069 .config
1070 .clone();
1071
1072 let enable_worker_core_affinity =
1073 self.catalog().system_config().enable_worker_core_affinity();
1074
1075 self.controller
1076 .create_replica(
1077 cluster_id,
1078 replica_id,
1079 cluster.name.to_owned(),
1080 replica_name,
1081 role,
1082 replica_config,
1083 enable_worker_core_affinity,
1084 )
1085 .expect("creating replicas must not fail");
1086
1087 self.install_introspection_subscribes(cluster_id, replica_id)
1088 .await;
1089 }
1090
1091 pub(crate) async fn sequence_alter_cluster_managed_to_managed(
1100 &mut self,
1101 session: Option<&Session>,
1102 cluster_id: ClusterId,
1103 new_config: ClusterConfig,
1104 reason: ReplicaCreateDropReason,
1105 strategy: AlterClusterPlanStrategy,
1106 ) -> Result<NeedsFinalization, AdapterError> {
1107 let cluster = self.catalog.get_cluster(cluster_id);
1108 let name = cluster.name().to_string();
1109 let owner_id = cluster.owner_id();
1110
1111 let mut ops = vec![];
1112 let mut create_cluster_replicas = vec![];
1113 let mut finalization_needed = NeedsFinalization::No;
1114
1115 let ClusterVariant::Managed(ClusterVariantManaged {
1116 size,
1117 availability_zones,
1118 logging,
1119 replication_factor,
1120 optimizer_feature_overrides: _,
1121 schedule: _,
1122 }) = &cluster.config.variant
1123 else {
1124 panic!("expected existing managed cluster config");
1125 };
1126 let ClusterVariant::Managed(ClusterVariantManaged {
1127 size: new_size,
1128 replication_factor: new_replication_factor,
1129 availability_zones: new_availability_zones,
1130 logging: new_logging,
1131 optimizer_feature_overrides: _,
1132 schedule: _,
1133 }) = &new_config.variant
1134 else {
1135 panic!("expected new managed cluster config");
1136 };
1137
1138 let role_id = session.map(|s| s.role_metadata().current_role);
1139 self.catalog.ensure_valid_replica_size(
1140 &self.catalog().get_role_allowed_cluster_sizes(&role_id),
1141 new_size,
1142 )?;
1143
1144 if cluster.replicas().any(|r| r.config.location.pending()) {
1146 coord_bail!("Cannot Alter clusters with pending updates.")
1147 }
1148
1149 let compute = mz_sql::plan::ComputeReplicaConfig {
1150 introspection: new_logging
1151 .interval
1152 .map(|interval| ComputeReplicaIntrospectionConfig {
1153 debugging: new_logging.log_logging,
1154 interval,
1155 }),
1156 };
1157
1158 if new_replication_factor > replication_factor {
1163 if cluster_id.is_user() {
1164 self.validate_resource_limit(
1165 usize::cast_from(*replication_factor),
1166 i64::from(*new_replication_factor) - i64::from(*replication_factor),
1167 SystemVars::max_replicas_per_cluster,
1168 "cluster replica",
1169 MAX_REPLICAS_PER_CLUSTER.name(),
1170 )?;
1171 }
1172 }
1173
1174 if new_size != size
1175 || new_availability_zones != availability_zones
1176 || new_logging != logging
1177 {
1178 self.ensure_valid_azs(new_availability_zones.iter())?;
1179 match strategy {
1183 AlterClusterPlanStrategy::None => {
1184 let replica_ids_and_reasons = (0..*replication_factor)
1185 .map(managed_cluster_replica_name)
1186 .filter_map(|name| cluster.replica_id(&name))
1187 .map(|replica_id| {
1188 catalog::DropObjectInfo::ClusterReplica((
1189 cluster.id(),
1190 replica_id,
1191 reason.clone(),
1192 ))
1193 })
1194 .collect();
1195 ops.push(catalog::Op::DropObjects(replica_ids_and_reasons));
1196 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1197 self.create_managed_cluster_replica_op(
1198 cluster_id,
1199 name.clone(),
1200 &compute,
1201 new_size,
1202 &mut ops,
1203 Some(new_availability_zones.as_ref()),
1204 false,
1205 owner_id,
1206 reason.clone(),
1207 )?;
1208 create_cluster_replicas.push((cluster_id, name));
1209 }
1210 }
1211 AlterClusterPlanStrategy::For(_) | AlterClusterPlanStrategy::UntilReady { .. } => {
1212 for name in (0..*new_replication_factor).map(managed_cluster_replica_name) {
1213 let name = format!("{name}{PENDING_REPLICA_SUFFIX}");
1214 self.create_managed_cluster_replica_op(
1215 cluster_id,
1216 name.clone(),
1217 &compute,
1218 new_size,
1219 &mut ops,
1220 Some(new_availability_zones.as_ref()),
1221 true,
1222 owner_id,
1223 reason.clone(),
1224 )?;
1225 create_cluster_replicas.push((cluster_id, name));
1226 }
1227 finalization_needed = NeedsFinalization::Yes;
1228 }
1229 }
1230 } else if new_replication_factor < replication_factor {
1231 let replica_ids = (*new_replication_factor..*replication_factor)
1233 .map(managed_cluster_replica_name)
1234 .filter_map(|name| cluster.replica_id(&name))
1235 .map(|replica_id| {
1236 catalog::DropObjectInfo::ClusterReplica((
1237 cluster.id(),
1238 replica_id,
1239 reason.clone(),
1240 ))
1241 })
1242 .collect();
1243 ops.push(catalog::Op::DropObjects(replica_ids));
1244 } else if new_replication_factor > replication_factor {
1245 for name in
1247 (*replication_factor..*new_replication_factor).map(managed_cluster_replica_name)
1248 {
1249 self.create_managed_cluster_replica_op(
1250 cluster_id,
1251 name.clone(),
1252 &compute,
1253 new_size,
1254 &mut ops,
1255 Some(new_availability_zones.as_ref()),
1258 false,
1259 owner_id,
1260 reason.clone(),
1261 )?;
1262 create_cluster_replicas.push((cluster_id, name))
1263 }
1264 }
1265
1266 match finalization_needed {
1269 NeedsFinalization::No => {
1270 ops.push(catalog::Op::UpdateClusterConfig {
1271 id: cluster_id,
1272 name: name.clone(),
1273 config: new_config,
1274 });
1275 }
1276 _ => {}
1277 }
1278 self.catalog_transact(session, ops.clone()).await?;
1279 for (cluster_id, replica_name) in create_cluster_replicas {
1280 let replica_id = self
1281 .catalog()
1282 .resolve_replica_in_cluster(&cluster_id, &replica_name)
1283 .expect("just created")
1284 .replica_id();
1285 self.create_cluster_replica(cluster_id, replica_id, replica_name)
1286 .await;
1287 }
1288 Ok(finalization_needed)
1289 }
1290
1291 async fn sequence_alter_cluster_unmanaged_to_managed(
1295 &mut self,
1296 session: &Session,
1297 cluster_id: ClusterId,
1298 mut new_config: ClusterConfig,
1299 options: PlanClusterOption,
1300 ) -> Result<(), AdapterError> {
1301 let cluster = self.catalog.get_cluster(cluster_id);
1302 let cluster_name = cluster.name().to_string();
1303
1304 let ClusterVariant::Managed(ClusterVariantManaged {
1305 size: new_size,
1306 replication_factor: new_replication_factor,
1307 availability_zones: new_availability_zones,
1308 logging: _,
1309 optimizer_feature_overrides: _,
1310 schedule: _,
1311 }) = &mut new_config.variant
1312 else {
1313 panic!("expected new managed cluster config");
1314 };
1315
1316 let user_replica_count = cluster
1318 .user_replicas()
1319 .count()
1320 .try_into()
1321 .expect("must_fit");
1322 match options.replication_factor {
1323 AlterOptionParameter::Set(_) => {
1324 if user_replica_count != *new_replication_factor {
1326 coord_bail!(
1327 "REPLICATION FACTOR {new_replication_factor} does not match number of replicas ({user_replica_count})"
1328 );
1329 }
1330 }
1331 _ => {
1332 *new_replication_factor = user_replica_count;
1333 }
1334 }
1335
1336 let mut names = BTreeSet::new();
1337 let mut sizes = BTreeSet::new();
1338
1339 self.ensure_valid_azs(new_availability_zones.iter())?;
1340
1341 for replica in cluster.user_replicas() {
1343 names.insert(replica.name.clone());
1344 match &replica.config.location {
1345 ReplicaLocation::Unmanaged(_) => coord_bail!(
1346 "Cannot convert unmanaged cluster with unmanaged replicas to managed cluster"
1347 ),
1348 ReplicaLocation::Managed(location) => {
1349 sizes.insert(location.size.clone());
1350
1351 if let ManagedReplicaAvailabilityZones::FromReplica(Some(az)) =
1352 &location.availability_zones
1353 {
1354 if !new_availability_zones.contains(az) {
1355 coord_bail!(
1356 "unmanaged replica has availability zone {az} which is not \
1357 in managed {new_availability_zones:?}"
1358 )
1359 }
1360 }
1361 }
1362 }
1363 }
1364
1365 if sizes.is_empty() {
1366 assert!(
1367 cluster.user_replicas().next().is_none(),
1368 "Cluster should not have replicas"
1369 );
1370 match &options.size {
1372 AlterOptionParameter::Reset | AlterOptionParameter::Unchanged => {
1373 coord_bail!("Missing SIZE for empty cluster")
1374 }
1375 _ => {} }
1377 } else if sizes.len() == 1 {
1378 let size = sizes.into_iter().next().expect("must exist");
1379 match &options.size {
1380 AlterOptionParameter::Set(sz) if *sz != size => {
1381 coord_bail!("Cluster replicas of size {size} do not match expected SIZE {sz}");
1382 }
1383 _ => *new_size = size,
1384 }
1385 } else {
1386 let formatted = sizes
1387 .iter()
1388 .map(String::as_str)
1389 .collect::<Vec<_>>()
1390 .join(", ");
1391 coord_bail!(
1392 "Cannot convert unmanaged cluster to managed, non-unique replica sizes: {formatted}"
1393 );
1394 }
1395
1396 for i in 0..*new_replication_factor {
1397 let name = managed_cluster_replica_name(i);
1398 names.remove(&name);
1399 }
1400 if !names.is_empty() {
1401 let formatted = names
1402 .iter()
1403 .map(String::as_str)
1404 .collect::<Vec<_>>()
1405 .join(", ");
1406 coord_bail!(
1407 "Cannot convert unmanaged cluster to managed, invalid replica names: {formatted}"
1408 );
1409 }
1410
1411 let ops = vec![catalog::Op::UpdateClusterConfig {
1412 id: cluster_id,
1413 name: cluster_name,
1414 config: new_config,
1415 }];
1416
1417 self.catalog_transact(Some(session), ops).await?;
1418 Ok(())
1419 }
1420
1421 async fn sequence_alter_cluster_managed_to_unmanaged(
1422 &mut self,
1423 session: &Session,
1424 cluster_id: ClusterId,
1425 new_config: ClusterConfig,
1426 ) -> Result<(), AdapterError> {
1427 let cluster = self.catalog().get_cluster(cluster_id);
1428
1429 let ops = vec![catalog::Op::UpdateClusterConfig {
1430 id: cluster_id,
1431 name: cluster.name().to_string(),
1432 config: new_config,
1433 }];
1434
1435 self.catalog_transact(Some(session), ops).await?;
1436 Ok(())
1437 }
1438
1439 async fn sequence_alter_cluster_unmanaged_to_unmanaged(
1440 &mut self,
1441 session: &Session,
1442 cluster_id: ClusterId,
1443 new_config: ClusterConfig,
1444 replicas: AlterOptionParameter<Vec<(String, mz_sql::plan::ReplicaConfig)>>,
1445 ) -> Result<(), AdapterError> {
1446 if !matches!(replicas, AlterOptionParameter::Unchanged) {
1447 coord_bail!("Cannot alter replicas in unmanaged cluster");
1448 }
1449
1450 let cluster = self.catalog().get_cluster(cluster_id);
1451
1452 let ops = vec![catalog::Op::UpdateClusterConfig {
1453 id: cluster_id,
1454 name: cluster.name().to_string(),
1455 config: new_config,
1456 }];
1457
1458 self.catalog_transact(Some(session), ops).await?;
1459 Ok(())
1460 }
1461
1462 pub(crate) async fn sequence_alter_cluster_rename(
1463 &mut self,
1464 ctx: &mut ExecuteContext,
1465 AlterClusterRenamePlan { id, name, to_name }: AlterClusterRenamePlan,
1466 ) -> Result<ExecuteResponse, AdapterError> {
1467 let op = Op::RenameCluster {
1468 id,
1469 name,
1470 to_name,
1471 check_reserved_names: true,
1472 };
1473 match self
1474 .catalog_transact_with_ddl_transaction(ctx, vec![op], |_, _| Box::pin(async {}))
1475 .await
1476 {
1477 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1478 Err(err) => Err(err),
1479 }
1480 }
1481
1482 pub(crate) async fn sequence_alter_cluster_swap(
1483 &mut self,
1484 ctx: &mut ExecuteContext,
1485 AlterClusterSwapPlan {
1486 id_a,
1487 id_b,
1488 name_a,
1489 name_b,
1490 name_temp,
1491 }: AlterClusterSwapPlan,
1492 ) -> Result<ExecuteResponse, AdapterError> {
1493 let op_a = Op::RenameCluster {
1494 id: id_a,
1495 name: name_a.clone(),
1496 to_name: name_temp.clone(),
1497 check_reserved_names: false,
1498 };
1499 let op_b = Op::RenameCluster {
1500 id: id_b,
1501 name: name_b.clone(),
1502 to_name: name_a,
1503 check_reserved_names: false,
1504 };
1505 let op_temp = Op::RenameCluster {
1506 id: id_a,
1507 name: name_temp,
1508 to_name: name_b,
1509 check_reserved_names: false,
1510 };
1511
1512 match self
1513 .catalog_transact_with_ddl_transaction(ctx, vec![op_a, op_b, op_temp], |_, _| {
1514 Box::pin(async {})
1515 })
1516 .await
1517 {
1518 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::Cluster)),
1519 Err(err) => Err(err),
1520 }
1521 }
1522
1523 pub(crate) async fn sequence_alter_cluster_replica_rename(
1524 &mut self,
1525 session: &Session,
1526 AlterClusterReplicaRenamePlan {
1527 cluster_id,
1528 replica_id,
1529 name,
1530 to_name,
1531 }: AlterClusterReplicaRenamePlan,
1532 ) -> Result<ExecuteResponse, AdapterError> {
1533 let op = catalog::Op::RenameClusterReplica {
1534 cluster_id,
1535 replica_id,
1536 name,
1537 to_name,
1538 };
1539 match self.catalog_transact(Some(session), vec![op]).await {
1540 Ok(()) => Ok(ExecuteResponse::AlteredObject(ObjectType::ClusterReplica)),
1541 Err(err) => Err(err),
1542 }
1543 }
1544
1545 pub(crate) async fn sequence_alter_set_cluster(
1547 &self,
1548 _session: &Session,
1549 AlterSetClusterPlan { id, set_cluster: _ }: AlterSetClusterPlan,
1550 ) -> Result<ExecuteResponse, AdapterError> {
1551 async {}.await;
1555 let entry = self.catalog().get_entry(&id);
1556 match entry.item().typ() {
1557 _ => {
1558 Err(AdapterError::Unsupported("ALTER SET CLUSTER"))
1560 }
1561 }
1562 }
1563}
1564
1565fn managed_cluster_replica_name(index: u32) -> String {
1566 format!("r{}", index + 1)
1567}
1568
1569#[derive(PartialEq)]
1572pub(crate) enum NeedsFinalization {
1573 Yes,
1575 No,
1576}