1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use chrono::{DateTime, Utc};
21use futures::stream::{BoxStream, StreamExt};
22use mz_cluster_client::client::ClusterReplicaLocation;
23use mz_compute_client::controller::ComputeControllerTimestamp;
24use mz_compute_client::logging::LogVariant;
25use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL;
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_orchestrator::NamespacedOrchestrator;
30use mz_orchestrator::{
31 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
32 ServiceEvent, ServicePort,
33};
34use mz_ore::halt;
35use mz_ore::instrument;
36use mz_ore::task::{self, AbortOnDropHandle};
37use mz_repr::GlobalId;
38use mz_repr::adt::numeric::Numeric;
39use regex::Regex;
40use serde::{Deserialize, Serialize};
41use tokio::time;
42use tracing::{error, info, warn};
43
44use crate::Controller;
45
46pub struct ClusterConfig {
48 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
53 pub workload_class: Option<String>,
56}
57
58pub type ClusterStatus = mz_orchestrator::ServiceStatus;
60
61#[derive(Clone, Debug, Serialize, PartialEq)]
63pub struct ReplicaConfig {
64 pub location: ReplicaLocation,
66 pub compute: ComputeReplicaConfig,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
72pub struct ReplicaAllocation {
73 pub memory_limit: Option<MemoryLimit>,
75 pub cpu_limit: Option<CpuLimit>,
77 pub disk_limit: Option<DiskLimit>,
79 pub scale: u16,
81 pub workers: usize,
83 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
85 pub credits_per_hour: Numeric,
86 #[serde(default)]
88 pub cpu_exclusive: bool,
89 #[serde(default = "default_true")]
92 pub is_cc: bool,
93 #[serde(default)]
95 pub disabled: bool,
96 #[serde(default)]
98 pub selectors: BTreeMap<String, String>,
99}
100
101fn default_true() -> bool {
102 true
103}
104
105#[mz_ore::test]
106#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
109 use bytesize::ByteSize;
110
111 let data = r#"
112 {
113 "cpu_limit": 1.0,
114 "memory_limit": "10GiB",
115 "disk_limit": "100MiB",
116 "scale": 16,
117 "workers": 1,
118 "credits_per_hour": "16",
119 "selectors": {
120 "key1": "value1",
121 "key2": "value2"
122 }
123 }"#;
124
125 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
126 .expect("deserialization from JSON succeeds for ReplicaAllocation");
127
128 assert_eq!(
129 replica_allocation,
130 ReplicaAllocation {
131 credits_per_hour: 16.into(),
132 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
133 disabled: false,
134 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
135 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
136 cpu_exclusive: false,
137 is_cc: true,
138 scale: 16,
139 workers: 1,
140 selectors: BTreeMap::from([
141 ("key1".to_string(), "value1".to_string()),
142 ("key2".to_string(), "value2".to_string())
143 ]),
144 }
145 );
146
147 let data = r#"
148 {
149 "cpu_limit": 0,
150 "memory_limit": "0GiB",
151 "disk_limit": "0MiB",
152 "scale": 0,
153 "workers": 0,
154 "credits_per_hour": "0",
155 "cpu_exclusive": true,
156 "disabled": true
157 }"#;
158
159 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
160 .expect("deserialization from JSON succeeds for ReplicaAllocation");
161
162 assert_eq!(
163 replica_allocation,
164 ReplicaAllocation {
165 credits_per_hour: 0.into(),
166 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
167 disabled: true,
168 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
169 cpu_limit: Some(CpuLimit::from_millicpus(0)),
170 cpu_exclusive: true,
171 is_cc: true,
172 scale: 0,
173 workers: 0,
174 selectors: Default::default(),
175 }
176 );
177}
178
179#[derive(Clone, Debug, Serialize, PartialEq)]
181pub enum ReplicaLocation {
182 Unmanaged(UnmanagedReplicaLocation),
184 Managed(ManagedReplicaLocation),
186}
187
188impl ReplicaLocation {
189 pub fn num_processes(&self) -> usize {
191 match self {
192 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
193 computectl_addrs, ..
194 }) => computectl_addrs.len(),
195 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
196 allocation.scale.into()
197 }
198 }
199 }
200
201 pub fn billed_as(&self) -> Option<&str> {
202 match self {
203 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
204 billed_as.as_deref()
205 }
206 _ => None,
207 }
208 }
209
210 pub fn internal(&self) -> bool {
211 match self {
212 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
213 ReplicaLocation::Unmanaged(_) => false,
214 }
215 }
216
217 pub fn workers(&self) -> usize {
218 let workers_per_process = match self {
219 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
220 allocation.workers
221 }
222 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation { workers, .. }) => *workers,
223 };
224 workers_per_process * self.num_processes()
225 }
226
227 pub fn pending(&self) -> bool {
232 match self {
233 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
234 _ => false,
235 }
236 }
237}
238
239pub enum ClusterRole {
242 SystemCritical,
245 System,
249 User,
252}
253
254#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
256pub struct UnmanagedReplicaLocation {
257 pub storagectl_addrs: Vec<String>,
260 pub storage_addrs: Vec<String>,
263 pub computectl_addrs: Vec<String>,
266 pub compute_addrs: Vec<String>,
269 pub workers: usize,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
275pub enum ManagedReplicaAvailabilityZones {
276 FromCluster(Option<Vec<String>>),
280 FromReplica(Option<String>),
283}
284
285#[derive(Clone, Debug, Serialize, PartialEq)]
287pub struct ManagedReplicaLocation {
288 pub allocation: ReplicaAllocation,
290 pub size: String,
292 pub internal: bool,
294 pub billed_as: Option<String>,
296 #[serde(skip)]
310 pub availability_zones: ManagedReplicaAvailabilityZones,
311 pub disk: bool,
313 pub pending: bool,
315}
316
317impl ManagedReplicaLocation {
318 pub fn size_for_billing(&self) -> &str {
320 self.billed_as.as_deref().unwrap_or(&self.size)
321 }
322}
323
324pub type ReplicaLogging = ComputeReplicaLogging;
326
327pub type ProcessId = u64;
329
330#[derive(Debug, Clone, Serialize)]
332pub struct ClusterEvent {
333 pub cluster_id: ClusterId,
334 pub replica_id: ReplicaId,
335 pub process_id: ProcessId,
336 pub status: ClusterStatus,
337 pub time: DateTime<Utc>,
338}
339
340impl<T> Controller<T>
341where
342 T: ComputeControllerTimestamp,
343 ComputeGrpcClient: ComputeClient<T>,
344{
345 pub fn create_cluster(
351 &mut self,
352 id: ClusterId,
353 config: ClusterConfig,
354 ) -> Result<(), anyhow::Error> {
355 self.storage.create_instance(id);
356 self.compute
357 .create_instance(id, config.arranged_logs, config.workload_class)?;
358 Ok(())
359 }
360
361 pub fn update_cluster_workload_class(
363 &mut self,
364 id: ClusterId,
365 workload_class: Option<String>,
366 ) -> Result<(), anyhow::Error> {
367 self.storage
368 .update_instance_workload_class(id, workload_class.clone());
369 self.compute
370 .update_instance_workload_class(id, workload_class)?;
371 Ok(())
372 }
373
374 pub fn drop_cluster(&mut self, id: ClusterId) {
380 self.storage.drop_instance(id);
381 self.compute.drop_instance(id);
382 }
383
384 pub fn create_replica(
387 &mut self,
388 cluster_id: ClusterId,
389 replica_id: ReplicaId,
390 role: ClusterRole,
391 config: ReplicaConfig,
392 enable_worker_core_affinity: bool,
393 ) -> Result<(), anyhow::Error> {
394 let storage_location: ClusterReplicaLocation;
395 let compute_location: ClusterReplicaLocation;
396 let metrics_task: Option<AbortOnDropHandle<()>>;
397
398 match config.location {
399 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
400 storagectl_addrs,
401 storage_addrs,
402 computectl_addrs,
403 compute_addrs,
404 workers,
405 }) => {
406 compute_location = ClusterReplicaLocation {
407 ctl_addrs: computectl_addrs,
408 dataflow_addrs: compute_addrs,
409 workers,
410 };
411 storage_location = ClusterReplicaLocation {
412 ctl_addrs: storagectl_addrs,
413 dataflow_addrs: storage_addrs,
414 workers,
416 };
417 metrics_task = None;
418 }
419 ReplicaLocation::Managed(m) => {
420 let workers = m.allocation.workers;
421 let (service, metrics_task_join_handle) = self.provision_replica(
422 cluster_id,
423 replica_id,
424 role,
425 m,
426 enable_worker_core_affinity,
427 )?;
428 storage_location = ClusterReplicaLocation {
429 ctl_addrs: service.addresses("storagectl"),
430 dataflow_addrs: service.addresses("storage"),
431 workers,
432 };
433 compute_location = ClusterReplicaLocation {
434 ctl_addrs: service.addresses("computectl"),
435 dataflow_addrs: service.addresses("compute"),
436 workers,
437 };
438 metrics_task = Some(metrics_task_join_handle);
439 }
440 }
441
442 self.storage
443 .connect_replica(cluster_id, replica_id, storage_location);
444 self.compute.add_replica_to_instance(
445 cluster_id,
446 replica_id,
447 compute_location,
448 config.compute,
449 )?;
450
451 if let Some(task) = metrics_task {
452 self.metrics_tasks.insert(replica_id, task);
453 }
454
455 Ok(())
456 }
457
458 pub fn drop_replica(
460 &mut self,
461 cluster_id: ClusterId,
462 replica_id: ReplicaId,
463 ) -> Result<(), anyhow::Error> {
464 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
469 self.metrics_tasks.remove(&replica_id);
470
471 self.compute.drop_replica(cluster_id, replica_id)?;
472 self.storage.drop_replica(cluster_id, replica_id);
473 Ok(())
474 }
475
476 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
478 let deploy_generation = self.deploy_generation;
479 let dyncfg = Arc::clone(self.compute.dyncfg());
480 let orchestrator = Arc::clone(&self.orchestrator);
481 task::spawn(
482 || "controller_remove_past_generation_replicas",
483 async move {
484 info!("attempting to remove past generation replicas");
485 loop {
486 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
487 .await
488 {
489 Ok(()) => {
490 info!("successfully removed past generation replicas");
491 return;
492 }
493 Err(e) => {
494 let interval =
495 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
496 .get(&dyncfg);
497 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
498 time::sleep(interval).await;
499 }
500 }
501 }
502 },
503 );
504 }
505
506 #[instrument]
508 pub async fn remove_orphaned_replicas(
509 &mut self,
510 next_user_replica_id: u64,
511 next_system_replica_id: u64,
512 ) -> Result<(), anyhow::Error> {
513 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
514
515 let actual: BTreeSet<_> = self
516 .orchestrator
517 .list_services()
518 .await?
519 .iter()
520 .map(|s| ReplicaServiceName::from_str(s))
521 .collect::<Result<_, _>>()?;
522
523 for ReplicaServiceName {
524 cluster_id,
525 replica_id,
526 generation,
527 } in actual
528 {
529 if generation != self.deploy_generation {
533 continue;
534 }
535
536 let smaller_next = match replica_id {
537 ReplicaId::User(id) if id >= next_user_replica_id => {
538 Some(ReplicaId::User(next_user_replica_id))
539 }
540 ReplicaId::System(id) if id >= next_system_replica_id => {
541 Some(ReplicaId::System(next_system_replica_id))
542 }
543 _ => None,
544 };
545 if let Some(next) = smaller_next {
546 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
551 }
552 if !desired.contains(&replica_id) {
553 self.deprovision_replica(cluster_id, replica_id, generation)?;
554 }
555 }
556
557 Ok(())
558 }
559
560 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
561 let deploy_generation = self.deploy_generation;
562
563 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
564 let ReplicaServiceName {
565 cluster_id,
566 replica_id,
567 generation: replica_generation,
568 ..
569 } = event.service_id.parse()?;
570
571 let event = ClusterEvent {
572 cluster_id,
573 replica_id,
574 process_id: event.process_id,
575 status: event.status,
576 time: event.time,
577 };
578
579 Ok((event, replica_generation))
580 }
581
582 let stream = self
583 .orchestrator
584 .watch_services()
585 .map(|event| event.and_then(translate_event))
586 .filter_map(move |event| async move {
587 match event {
588 Ok((event, replica_generation)) => {
589 if replica_generation == deploy_generation {
590 Some(event)
591 } else {
592 None
593 }
594 }
595 Err(error) => {
596 error!("service watch error: {error}");
597 None
598 }
599 }
600 });
601
602 Box::pin(stream)
603 }
604
605 fn provision_replica(
607 &self,
608 cluster_id: ClusterId,
609 replica_id: ReplicaId,
610 role: ClusterRole,
611 location: ManagedReplicaLocation,
612 enable_worker_core_affinity: bool,
613 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
614 let service_name = ReplicaServiceName {
615 cluster_id,
616 replica_id,
617 generation: self.deploy_generation,
618 }
619 .to_string();
620 let role_label = match role {
621 ClusterRole::SystemCritical => "system-critical",
622 ClusterRole::System => "system",
623 ClusterRole::User => "user",
624 };
625 let environment_id = self.connection_context().environment_id.clone();
626 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
627 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
628 let persist_pubsub_url = self.persist_pubsub_url.clone();
629 let secrets_args = self.secrets_args.to_flags();
630 let service = self.orchestrator.ensure_service(
631 &service_name,
632 ServiceConfig {
633 image: self.clusterd_image.clone(),
634 init_container_image: self.init_container_image.clone(),
635 args: Box::new(move |assigned| {
636 let mut args = vec![
637 format!(
638 "--storage-controller-listen-addr={}",
639 assigned["storagectl"]
640 ),
641 format!(
642 "--compute-controller-listen-addr={}",
643 assigned["computectl"]
644 ),
645 format!("--internal-http-listen-addr={}", assigned["internal-http"]),
646 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
647 format!("--opentelemetry-resource=replica_id={}", replica_id),
648 format!("--persist-pubsub-url={}", persist_pubsub_url),
649 format!("--environment-id={}", environment_id),
650 ];
651 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
652 args.push(format!(
653 "--aws-external-id-prefix={}",
654 aws_external_id_prefix
655 ));
656 }
657 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
658 args.push(format!(
659 "--aws-connection-role-arn={}",
660 aws_connection_role_arn
661 ));
662 }
663 if let Some(memory_limit) = location.allocation.memory_limit {
664 args.push(format!(
665 "--announce-memory-limit={}",
666 memory_limit.0.as_u64()
667 ));
668 }
669 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
670 args.push("--worker-core-affinity".into());
671 }
672 if location.allocation.is_cc {
673 args.push("--is-cc".into());
674 }
675
676 args.extend(secrets_args.clone());
677
678 args
679 }),
680 ports: vec![
681 ServicePort {
682 name: "storagectl".into(),
683 port_hint: 2100,
684 },
685 ServicePort {
689 name: "storage".into(),
690 port_hint: 2103,
691 },
692 ServicePort {
693 name: "computectl".into(),
694 port_hint: 2101,
695 },
696 ServicePort {
697 name: "compute".into(),
698 port_hint: 2102,
699 },
700 ServicePort {
701 name: "internal-http".into(),
702 port_hint: 6878,
703 },
704 ],
705 cpu_limit: location.allocation.cpu_limit,
706 memory_limit: location.allocation.memory_limit,
707 scale: location.allocation.scale,
708 labels: BTreeMap::from([
709 ("replica-id".into(), replica_id.to_string()),
710 ("cluster-id".into(), cluster_id.to_string()),
711 ("type".into(), "cluster".into()),
712 ("replica-role".into(), role_label.into()),
713 ("workers".into(), location.allocation.workers.to_string()),
714 ("size".into(), location.size.to_string()),
715 ]),
716 availability_zones: match location.availability_zones {
717 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
718 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
719 },
720 other_replicas_selector: vec![
724 LabelSelector {
725 label_name: "cluster-id".to_string(),
726 logic: LabelSelectionLogic::Eq {
727 value: cluster_id.to_string(),
728 },
729 },
730 LabelSelector {
732 label_name: "replica-id".into(),
733 logic: LabelSelectionLogic::NotEq {
734 value: replica_id.to_string(),
735 },
736 },
737 ],
738 replicas_selector: vec![LabelSelector {
739 label_name: "cluster-id".to_string(),
740 logic: LabelSelectionLogic::Eq {
742 value: cluster_id.to_string(),
743 },
744 }],
745 disk_limit: location.allocation.disk_limit,
746 disk: location.disk,
747 node_selector: location.allocation.selectors,
748 },
749 )?;
750
751 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
752 let tx = self.metrics_tx.clone();
753 let orchestrator = Arc::clone(&self.orchestrator);
754 let service_name = service_name.clone();
755 async move {
756 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
757
758 let mut interval = tokio::time::interval(METRICS_INTERVAL);
766 loop {
767 interval.tick().await;
768 match orchestrator.fetch_service_metrics(&service_name).await {
769 Ok(metrics) => {
770 let _ = tx.send((replica_id, metrics));
771 }
772 Err(e) => {
773 warn!("failed to get metrics for replica {replica_id}: {e}");
774 }
775 }
776 }
777 }
778 });
779
780 Ok((service, metrics_task.abort_on_drop()))
781 }
782
783 fn deprovision_replica(
785 &self,
786 cluster_id: ClusterId,
787 replica_id: ReplicaId,
788 generation: u64,
789 ) -> Result<(), anyhow::Error> {
790 let service_name = ReplicaServiceName {
791 cluster_id,
792 replica_id,
793 generation,
794 }
795 .to_string();
796 self.orchestrator.drop_service(&service_name)
797 }
798}
799
800async fn try_remove_past_generation_replicas(
802 orchestrator: &dyn NamespacedOrchestrator,
803 deploy_generation: u64,
804) -> Result<(), anyhow::Error> {
805 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
806
807 for service in services {
808 let name: ReplicaServiceName = service.parse()?;
809 if name.generation < deploy_generation {
810 info!(
811 cluster_id = %name.cluster_id,
812 replica_id = %name.replica_id,
813 "removing past generation replica",
814 );
815 orchestrator.drop_service(&service)?;
816 }
817 }
818
819 Ok(())
820}
821
822#[derive(PartialEq, Eq, PartialOrd, Ord)]
824pub struct ReplicaServiceName {
825 pub cluster_id: ClusterId,
826 pub replica_id: ReplicaId,
827 pub generation: u64,
828}
829
830impl fmt::Display for ReplicaServiceName {
831 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
832 let ReplicaServiceName {
833 cluster_id,
834 replica_id,
835 generation,
836 } = self;
837 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
838 }
839}
840
841impl FromStr for ReplicaServiceName {
842 type Err = anyhow::Error;
843
844 fn from_str(s: &str) -> Result<Self, Self::Err> {
845 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
846 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
847 });
848
849 let caps = SERVICE_NAME_RE
850 .captures(s)
851 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
852
853 Ok(ReplicaServiceName {
854 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
855 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
856 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
860 })
861 }
862}