1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35 ServiceEvent, ServicePort,
36};
37use mz_ore::cast::CastInto;
38use mz_ore::task::{self, AbortOnDropHandle};
39use mz_ore::{halt, instrument};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49pub struct ClusterConfig {
51 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56 pub workload_class: Option<String>,
59}
60
61pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67 pub location: ReplicaLocation,
69 pub compute: ComputeReplicaConfig,
71}
72
73#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76 pub memory_limit: Option<MemoryLimit>,
78 pub cpu_limit: Option<CpuLimit>,
80 pub cpu_request: Option<CpuLimit>,
82 pub disk_limit: Option<DiskLimit>,
84 pub scale: NonZero<u16>,
86 pub workers: NonZero<usize>,
88 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90 pub credits_per_hour: Numeric,
91 #[serde(default)]
93 pub cpu_exclusive: bool,
94 #[serde(default = "default_true")]
97 pub is_cc: bool,
98 #[serde(default)]
107 pub family: Option<String>,
108 #[serde(default)]
110 pub swap_enabled: bool,
111 #[serde(default)]
113 pub disabled: bool,
114 #[serde(default)]
116 pub selectors: BTreeMap<String, String>,
117}
118
119impl ReplicaAllocation {
120 pub fn family(&self) -> &str {
129 match &self.family {
130 Some(family) => family.as_str(),
131 None if self.is_cc => "cc",
132 None => "legacy",
133 }
134 }
135}
136
137fn default_true() -> bool {
138 true
139}
140
141#[mz_ore::test]
142#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
145 use bytesize::ByteSize;
146 use mz_ore::{assert_err, assert_ok};
147
148 let data = r#"
149 {
150 "cpu_limit": 1.0,
151 "memory_limit": "10GiB",
152 "disk_limit": "100MiB",
153 "scale": 16,
154 "workers": 1,
155 "credits_per_hour": "16",
156 "swap_enabled": true,
157 "selectors": {
158 "key1": "value1",
159 "key2": "value2"
160 }
161 }"#;
162
163 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
164 .expect("deserialization from JSON succeeds for ReplicaAllocation");
165
166 assert_eq!(
167 replica_allocation,
168 ReplicaAllocation {
169 credits_per_hour: 16.into(),
170 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
171 disabled: false,
172 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
173 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
174 cpu_request: None,
175 cpu_exclusive: false,
176 is_cc: true,
177 family: None,
178 swap_enabled: true,
179 scale: NonZero::new(16).unwrap(),
180 workers: NonZero::new(1).unwrap(),
181 selectors: BTreeMap::from([
182 ("key1".to_string(), "value1".to_string()),
183 ("key2".to_string(), "value2".to_string())
184 ]),
185 }
186 );
187
188 let data = r#"
189 {
190 "cpu_limit": 0,
191 "memory_limit": "0GiB",
192 "disk_limit": "0MiB",
193 "scale": 1,
194 "workers": 1,
195 "credits_per_hour": "0",
196 "cpu_exclusive": true,
197 "disabled": true
198 }"#;
199
200 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
201 .expect("deserialization from JSON succeeds for ReplicaAllocation");
202
203 assert_eq!(
204 replica_allocation,
205 ReplicaAllocation {
206 credits_per_hour: 0.into(),
207 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
208 disabled: true,
209 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
210 cpu_limit: Some(CpuLimit::from_millicpus(0)),
211 cpu_request: None,
212 cpu_exclusive: true,
213 is_cc: true,
214 family: None,
215 swap_enabled: false,
216 scale: NonZero::new(1).unwrap(),
217 workers: NonZero::new(1).unwrap(),
218 selectors: Default::default(),
219 }
220 );
221
222 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
224 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
225 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
226 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
227 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
228 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
229}
230
231#[mz_ore::test]
232#[cfg_attr(miri, ignore)] fn test_replica_allocation_family() {
234 let parse = |json: &str| -> ReplicaAllocation {
235 serde_json::from_str(json).expect("deserialization from JSON succeeds")
236 };
237
238 assert_eq!(
240 parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "family": "D"}"#).family(),
241 "D"
242 );
243 assert_eq!(
246 parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#).family(),
247 "cc"
248 );
249 assert_eq!(
252 parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false}"#).family(),
253 "legacy"
254 );
255 assert_eq!(
257 parse(
258 r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false, "family": "legacy-special"}"#
259 )
260 .family(),
261 "legacy-special"
262 );
263}
264
265#[derive(Clone, Debug, Serialize, PartialEq)]
267pub enum ReplicaLocation {
268 Unmanaged(UnmanagedReplicaLocation),
270 Managed(ManagedReplicaLocation),
272}
273
274impl ReplicaLocation {
275 pub fn num_processes(&self) -> usize {
277 match self {
278 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
279 computectl_addrs, ..
280 }) => computectl_addrs.len(),
281 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
282 allocation.scale.cast_into()
283 }
284 }
285 }
286
287 pub fn billed_as(&self) -> Option<&str> {
288 match self {
289 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
290 billed_as.as_deref()
291 }
292 ReplicaLocation::Unmanaged(_) => None,
293 }
294 }
295
296 pub fn internal(&self) -> bool {
297 match self {
298 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
299 ReplicaLocation::Unmanaged(_) => false,
300 }
301 }
302
303 pub fn workers(&self) -> Option<usize> {
307 match self {
308 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
309 Some(allocation.workers.get() * self.num_processes())
310 }
311 ReplicaLocation::Unmanaged(_) => None,
312 }
313 }
314
315 pub fn pending(&self) -> bool {
320 match self {
321 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
322 ReplicaLocation::Unmanaged(_) => false,
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
330pub enum ClusterRole {
331 SystemCritical,
334 System,
338 User,
341}
342
343#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
345pub struct UnmanagedReplicaLocation {
346 pub storagectl_addrs: Vec<String>,
349 pub computectl_addrs: Vec<String>,
352}
353
354#[derive(Clone, Debug, Serialize, PartialEq)]
356pub struct ManagedReplicaLocation {
357 pub allocation: ReplicaAllocation,
359 pub size: String,
361 pub internal: bool,
363 pub billed_as: Option<String>,
365 #[serde(skip)]
376 pub availability_zones: Vec<String>,
377 pub pending: bool,
379}
380
381impl ManagedReplicaLocation {
382 pub fn size_for_billing(&self) -> &str {
384 self.billed_as.as_deref().unwrap_or(&self.size)
385 }
386}
387
388pub type ReplicaLogging = ComputeReplicaLogging;
390
391pub type ProcessId = u64;
393
394#[derive(Debug, Clone, Serialize)]
396pub struct ClusterEvent {
397 pub cluster_id: ClusterId,
398 pub replica_id: ReplicaId,
399 pub process_id: ProcessId,
400 pub status: ClusterStatus,
401 pub time: DateTime<Utc>,
402}
403
404impl Controller {
405 pub fn create_cluster(
411 &mut self,
412 id: ClusterId,
413 config: ClusterConfig,
414 ) -> Result<(), anyhow::Error> {
415 self.storage
416 .create_instance(id, config.workload_class.clone());
417 self.compute
418 .create_instance(id, config.arranged_logs, config.workload_class)?;
419 Ok(())
420 }
421
422 pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
428 self.storage
429 .update_instance_workload_class(id, workload_class.clone());
430 self.compute
431 .update_instance_workload_class(id, workload_class)
432 .expect("instance exists");
433 }
434
435 pub fn drop_cluster(&mut self, id: ClusterId) {
441 self.storage.drop_instance(id);
442 self.compute.drop_instance(id);
443 }
444
445 pub fn create_replica(
448 &mut self,
449 cluster_id: ClusterId,
450 replica_id: ReplicaId,
451 cluster_name: String,
452 replica_name: String,
453 role: ClusterRole,
454 config: ReplicaConfig,
455 enable_worker_core_affinity: bool,
456 enable_storage_introspection_logs: bool,
457 ) -> Result<(), anyhow::Error> {
458 let storage_location: ClusterReplicaLocation;
459 let compute_location: ClusterReplicaLocation;
460 let metrics_task: Option<AbortOnDropHandle<()>>;
461
462 match config.location {
463 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
464 storagectl_addrs,
465 computectl_addrs,
466 }) => {
467 compute_location = ClusterReplicaLocation {
468 ctl_addrs: computectl_addrs,
469 };
470 storage_location = ClusterReplicaLocation {
471 ctl_addrs: storagectl_addrs,
472 };
473 metrics_task = None;
474 }
475 ReplicaLocation::Managed(m) => {
476 let (service, metrics_task_join_handle) = self.provision_replica(
477 cluster_id,
478 replica_id,
479 cluster_name,
480 replica_name,
481 role,
482 m,
483 enable_worker_core_affinity,
484 enable_storage_introspection_logs,
485 )?;
486 storage_location = ClusterReplicaLocation {
487 ctl_addrs: service.addresses("storagectl"),
488 };
489 compute_location = ClusterReplicaLocation {
490 ctl_addrs: service.addresses("computectl"),
491 };
492 metrics_task = Some(metrics_task_join_handle);
493
494 let http_addresses = service.addresses("internal-http");
496 self.replica_http_locator
497 .register_replica(cluster_id, replica_id, http_addresses);
498 }
499 }
500
501 self.storage
502 .connect_replica(cluster_id, replica_id, storage_location);
503 self.compute.add_replica_to_instance(
504 cluster_id,
505 replica_id,
506 compute_location,
507 config.compute,
508 )?;
509
510 if let Some(task) = metrics_task {
511 self.metrics_tasks.insert(replica_id, task);
512 }
513
514 Ok(())
515 }
516
517 pub fn drop_replica(
519 &mut self,
520 cluster_id: ClusterId,
521 replica_id: ReplicaId,
522 ) -> Result<(), anyhow::Error> {
523 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
528 self.metrics_tasks.remove(&replica_id);
529
530 self.replica_http_locator
532 .remove_replica(cluster_id, replica_id);
533
534 self.compute.drop_replica(cluster_id, replica_id)?;
535 self.storage.drop_replica(cluster_id, replica_id);
536 Ok(())
537 }
538
539 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
541 let deploy_generation = self.deploy_generation;
542 let dyncfg = Arc::clone(self.compute.dyncfg());
543 let orchestrator = Arc::clone(&self.orchestrator);
544 task::spawn(
545 || "controller_remove_past_generation_replicas",
546 async move {
547 info!("attempting to remove past generation replicas");
548 loop {
549 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
550 .await
551 {
552 Ok(()) => {
553 info!("successfully removed past generation replicas");
554 return;
555 }
556 Err(e) => {
557 let interval =
558 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
559 .get(&dyncfg);
560 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
561 time::sleep(interval).await;
562 }
563 }
564 }
565 },
566 );
567 }
568
569 #[instrument]
571 pub async fn remove_orphaned_replicas(
572 &mut self,
573 next_user_replica_id: u64,
574 next_system_replica_id: u64,
575 ) -> Result<(), anyhow::Error> {
576 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
577
578 let actual: BTreeSet<_> = self
579 .orchestrator
580 .list_services()
581 .await?
582 .iter()
583 .map(|s| ReplicaServiceName::from_str(s))
584 .collect::<Result<_, _>>()?;
585
586 for ReplicaServiceName {
587 cluster_id,
588 replica_id,
589 generation,
590 } in actual
591 {
592 if generation != self.deploy_generation {
596 continue;
597 }
598
599 let smaller_next = match replica_id {
600 ReplicaId::User(id) if id >= next_user_replica_id => {
601 Some(ReplicaId::User(next_user_replica_id))
602 }
603 ReplicaId::System(id) if id >= next_system_replica_id => {
604 Some(ReplicaId::System(next_system_replica_id))
605 }
606 _ => None,
607 };
608 if let Some(next) = smaller_next {
609 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
614 }
615 if !desired.contains(&replica_id) {
616 self.deprovision_replica(cluster_id, replica_id, generation)?;
617 }
618 }
619
620 Ok(())
621 }
622
623 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
624 let deploy_generation = self.deploy_generation;
625
626 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
627 let ReplicaServiceName {
628 cluster_id,
629 replica_id,
630 generation: replica_generation,
631 ..
632 } = event.service_id.parse()?;
633
634 let event = ClusterEvent {
635 cluster_id,
636 replica_id,
637 process_id: event.process_id,
638 status: event.status,
639 time: event.time,
640 };
641
642 Ok((event, replica_generation))
643 }
644
645 let stream = self
646 .orchestrator
647 .watch_services()
648 .map(|event| event.and_then(translate_event))
649 .filter_map(move |event| async move {
650 match event {
651 Ok((event, replica_generation)) => {
652 if replica_generation == deploy_generation {
653 Some(event)
654 } else {
655 None
656 }
657 }
658 Err(error) => {
659 error!("service watch error: {error}");
660 None
661 }
662 }
663 });
664
665 Box::pin(stream)
666 }
667
668 fn provision_replica(
670 &self,
671 cluster_id: ClusterId,
672 replica_id: ReplicaId,
673 cluster_name: String,
674 replica_name: String,
675 role: ClusterRole,
676 location: ManagedReplicaLocation,
677 enable_worker_core_affinity: bool,
678 enable_storage_introspection_logs: bool,
679 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
680 let service_name = ReplicaServiceName {
681 cluster_id,
682 replica_id,
683 generation: self.deploy_generation,
684 }
685 .to_string();
686 let role_label = match role {
687 ClusterRole::SystemCritical => "system-critical",
688 ClusterRole::System => "system",
689 ClusterRole::User => "user",
690 };
691 let environment_id = self.connection_context().environment_id.clone();
692 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
693 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
694 let persist_pubsub_url = self.persist_pubsub_url.clone();
695 let secrets_args = self.secrets_args.to_flags();
696
697 let storage_proto_timely_config = TimelyConfig {
699 arrangement_exert_proportionality: 1337,
700 ..Default::default()
701 };
702 let compute_proto_timely_config = TimelyConfig {
703 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
704 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
705 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
706 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
707 ..Default::default()
708 };
709
710 let mut disk_limit = location.allocation.disk_limit;
711 let memory_limit = location.allocation.memory_limit;
712 let mut memory_request = None;
713
714 if location.allocation.swap_enabled {
715 disk_limit = Some(DiskLimit::ZERO);
719
720 memory_request = memory_limit.map(|MemoryLimit(limit)| {
724 let request = ByteSize::b(limit.as_u64() - 1);
725 MemoryLimit(request)
726 });
727 }
728
729 let service = self.orchestrator.ensure_service(
730 &service_name,
731 ServiceConfig {
732 image: self.clusterd_image.clone(),
733 init_container_image: self.init_container_image.clone(),
734 args: Box::new(move |assigned| {
735 let storage_timely_config = TimelyConfig {
736 workers: location.allocation.workers.get(),
737 addresses: assigned.peer_addresses("storage"),
738 ..storage_proto_timely_config
739 };
740 let compute_timely_config = TimelyConfig {
741 workers: location.allocation.workers.get(),
742 addresses: assigned.peer_addresses("compute"),
743 ..compute_proto_timely_config
744 };
745
746 let mut args = vec![
747 format!(
748 "--storage-controller-listen-addr={}",
749 assigned.listen_addrs["storagectl"]
750 ),
751 format!(
752 "--compute-controller-listen-addr={}",
753 assigned.listen_addrs["computectl"]
754 ),
755 format!(
756 "--internal-http-listen-addr={}",
757 assigned.listen_addrs["internal-http"]
758 ),
759 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
760 format!("--opentelemetry-resource=replica_id={}", replica_id),
761 format!("--persist-pubsub-url={}", persist_pubsub_url),
762 format!("--environment-id={}", environment_id),
763 format!(
764 "--storage-timely-config={}",
765 storage_timely_config.to_string(),
766 ),
767 format!(
768 "--compute-timely-config={}",
769 compute_timely_config.to_string(),
770 ),
771 ];
772 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
773 args.push(format!(
774 "--aws-external-id-prefix={}",
775 aws_external_id_prefix
776 ));
777 }
778 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
779 args.push(format!(
780 "--aws-connection-role-arn={}",
781 aws_connection_role_arn
782 ));
783 }
784 if let Some(memory_limit) = location.allocation.memory_limit {
785 args.push(format!(
786 "--announce-memory-limit={}",
787 memory_limit.0.as_u64()
788 ));
789 }
790 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
791 args.push("--worker-core-affinity".into());
792 }
793 if enable_storage_introspection_logs {
794 args.push("--enable-storage-introspection-logs".into());
795 }
796 if location.allocation.is_cc {
797 args.push("--is-cc".into());
798 }
799
800 if location.allocation.swap_enabled
803 && let Some(memory_limit) = location.allocation.memory_limit
804 && let Some(disk_limit) = location.allocation.disk_limit
805 && disk_limit != DiskLimit::ZERO
809 {
810 let heap_limit = memory_limit.0 + disk_limit.0;
811 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
812 }
813
814 args.extend(secrets_args.clone());
815 args
816 }),
817 ports: vec![
818 ServicePort {
819 name: "storagectl".into(),
820 port_hint: 2100,
821 },
822 ServicePort {
826 name: "storage".into(),
827 port_hint: 2103,
828 },
829 ServicePort {
830 name: "computectl".into(),
831 port_hint: 2101,
832 },
833 ServicePort {
834 name: "compute".into(),
835 port_hint: 2102,
836 },
837 ServicePort {
838 name: "internal-http".into(),
839 port_hint: 6878,
840 },
841 ],
842 cpu_limit: location.allocation.cpu_limit,
843 cpu_request: location.allocation.cpu_request,
844 memory_limit,
845 memory_request,
846 scale: location.allocation.scale,
847 labels: BTreeMap::from([
848 ("replica-id".into(), replica_id.to_string()),
849 ("cluster-id".into(), cluster_id.to_string()),
850 ("generation".into(), self.deploy_generation.to_string()),
851 ("type".into(), "cluster".into()),
852 ("replica-role".into(), role_label.into()),
853 ("workers".into(), location.allocation.workers.to_string()),
854 (
855 "size".into(),
856 location
857 .size
858 .to_string()
859 .replace("=", "-")
860 .replace(",", "_"),
861 ),
862 ]),
863 annotations: BTreeMap::from([
864 (
865 "replica-name".into(),
866 format!("{cluster_name}.{replica_name}"),
867 ),
868 ("cluster-name".into(), cluster_name),
869 ]),
870 availability_zones: Some(location.availability_zones).filter(|azs| !azs.is_empty()),
873 other_replicas_selector: vec![
886 LabelSelector {
887 label_name: "cluster-id".to_string(),
888 logic: LabelSelectionLogic::Eq {
889 value: cluster_id.to_string(),
890 },
891 },
892 LabelSelector {
894 label_name: "replica-id".into(),
895 logic: LabelSelectionLogic::NotEq {
896 value: replica_id.to_string(),
897 },
898 },
899 LabelSelector {
900 label_name: "generation".into(),
901 logic: LabelSelectionLogic::Eq {
902 value: self.deploy_generation.to_string(),
903 },
904 },
905 ],
906 replicas_selector: vec![
907 LabelSelector {
908 label_name: "cluster-id".to_string(),
909 logic: LabelSelectionLogic::Eq {
911 value: cluster_id.to_string(),
912 },
913 },
914 LabelSelector {
915 label_name: "generation".into(),
916 logic: LabelSelectionLogic::Eq {
917 value: self.deploy_generation.to_string(),
918 },
919 },
920 ],
921 disk_limit,
922 node_selector: location.allocation.selectors,
923 },
924 )?;
925
926 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
927 let tx = self.metrics_tx.clone();
928 let orchestrator = Arc::clone(&self.orchestrator);
929 let service_name = service_name.clone();
930 async move {
931 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
932
933 let mut interval = tokio::time::interval(METRICS_INTERVAL);
941 loop {
942 interval.tick().await;
943 match orchestrator.fetch_service_metrics(&service_name).await {
944 Ok(metrics) => {
945 let _ = tx.send((replica_id, metrics));
946 }
947 Err(e) => {
948 warn!("failed to get metrics for replica {replica_id}: {e}");
949 }
950 }
951 }
952 }
953 });
954
955 Ok((service, metrics_task.abort_on_drop()))
956 }
957
958 fn deprovision_replica(
960 &self,
961 cluster_id: ClusterId,
962 replica_id: ReplicaId,
963 generation: u64,
964 ) -> Result<(), anyhow::Error> {
965 let service_name = ReplicaServiceName {
966 cluster_id,
967 replica_id,
968 generation,
969 }
970 .to_string();
971 self.orchestrator.drop_service(&service_name)
972 }
973}
974
975async fn try_remove_past_generation_replicas(
977 orchestrator: &dyn NamespacedOrchestrator,
978 deploy_generation: u64,
979) -> Result<(), anyhow::Error> {
980 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
981
982 for service in services {
983 let name: ReplicaServiceName = service.parse()?;
984 if name.generation < deploy_generation {
985 info!(
986 cluster_id = %name.cluster_id,
987 replica_id = %name.replica_id,
988 "removing past generation replica",
989 );
990 orchestrator.drop_service(&service)?;
991 }
992 }
993
994 Ok(())
995}
996
997#[derive(PartialEq, Eq, PartialOrd, Ord)]
999pub struct ReplicaServiceName {
1000 pub cluster_id: ClusterId,
1001 pub replica_id: ReplicaId,
1002 pub generation: u64,
1003}
1004
1005impl fmt::Display for ReplicaServiceName {
1006 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1007 let ReplicaServiceName {
1008 cluster_id,
1009 replica_id,
1010 generation,
1011 } = self;
1012 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
1013 }
1014}
1015
1016impl FromStr for ReplicaServiceName {
1017 type Err = anyhow::Error;
1018
1019 fn from_str(s: &str) -> Result<Self, Self::Err> {
1020 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
1021 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
1022 });
1023
1024 let caps = SERVICE_NAME_RE
1025 .captures(s)
1026 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
1027
1028 Ok(ReplicaServiceName {
1029 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
1030 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
1031 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
1035 })
1036 }
1037}