1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35 ServiceEvent, ServicePort,
36};
37use mz_ore::cast::CastInto;
38use mz_ore::task::{self, AbortOnDropHandle};
39use mz_ore::{halt, instrument};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49pub struct ClusterConfig {
51 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56 pub workload_class: Option<String>,
59}
60
61pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67 pub location: ReplicaLocation,
69 pub compute: ComputeReplicaConfig,
71}
72
73#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76 pub memory_limit: Option<MemoryLimit>,
78 pub cpu_limit: Option<CpuLimit>,
80 pub cpu_request: Option<CpuLimit>,
82 pub disk_limit: Option<DiskLimit>,
84 pub scale: NonZero<u16>,
86 pub workers: NonZero<usize>,
88 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90 pub credits_per_hour: Numeric,
91 #[serde(default)]
93 pub cpu_exclusive: bool,
94 #[serde(default = "default_true")]
97 pub is_cc: bool,
98 #[serde(default)]
100 pub swap_enabled: bool,
101 #[serde(default)]
103 pub disabled: bool,
104 #[serde(default)]
106 pub selectors: BTreeMap<String, String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[mz_ore::test]
114#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
117 use bytesize::ByteSize;
118 use mz_ore::{assert_err, assert_ok};
119
120 let data = r#"
121 {
122 "cpu_limit": 1.0,
123 "memory_limit": "10GiB",
124 "disk_limit": "100MiB",
125 "scale": 16,
126 "workers": 1,
127 "credits_per_hour": "16",
128 "swap_enabled": true,
129 "selectors": {
130 "key1": "value1",
131 "key2": "value2"
132 }
133 }"#;
134
135 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
136 .expect("deserialization from JSON succeeds for ReplicaAllocation");
137
138 assert_eq!(
139 replica_allocation,
140 ReplicaAllocation {
141 credits_per_hour: 16.into(),
142 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
143 disabled: false,
144 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
145 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
146 cpu_request: None,
147 cpu_exclusive: false,
148 is_cc: true,
149 swap_enabled: true,
150 scale: NonZero::new(16).unwrap(),
151 workers: NonZero::new(1).unwrap(),
152 selectors: BTreeMap::from([
153 ("key1".to_string(), "value1".to_string()),
154 ("key2".to_string(), "value2".to_string())
155 ]),
156 }
157 );
158
159 let data = r#"
160 {
161 "cpu_limit": 0,
162 "memory_limit": "0GiB",
163 "disk_limit": "0MiB",
164 "scale": 1,
165 "workers": 1,
166 "credits_per_hour": "0",
167 "cpu_exclusive": true,
168 "disabled": true
169 }"#;
170
171 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
172 .expect("deserialization from JSON succeeds for ReplicaAllocation");
173
174 assert_eq!(
175 replica_allocation,
176 ReplicaAllocation {
177 credits_per_hour: 0.into(),
178 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
179 disabled: true,
180 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
181 cpu_limit: Some(CpuLimit::from_millicpus(0)),
182 cpu_request: None,
183 cpu_exclusive: true,
184 is_cc: true,
185 swap_enabled: false,
186 scale: NonZero::new(1).unwrap(),
187 workers: NonZero::new(1).unwrap(),
188 selectors: Default::default(),
189 }
190 );
191
192 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
194 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
195 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
196 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
197 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
198 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
199}
200
201#[derive(Clone, Debug, Serialize, PartialEq)]
203pub enum ReplicaLocation {
204 Unmanaged(UnmanagedReplicaLocation),
206 Managed(ManagedReplicaLocation),
208}
209
210impl ReplicaLocation {
211 pub fn num_processes(&self) -> usize {
213 match self {
214 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
215 computectl_addrs, ..
216 }) => computectl_addrs.len(),
217 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
218 allocation.scale.cast_into()
219 }
220 }
221 }
222
223 pub fn billed_as(&self) -> Option<&str> {
224 match self {
225 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
226 billed_as.as_deref()
227 }
228 ReplicaLocation::Unmanaged(_) => None,
229 }
230 }
231
232 pub fn internal(&self) -> bool {
233 match self {
234 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
235 ReplicaLocation::Unmanaged(_) => false,
236 }
237 }
238
239 pub fn workers(&self) -> Option<usize> {
243 match self {
244 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
245 Some(allocation.workers.get() * self.num_processes())
246 }
247 ReplicaLocation::Unmanaged(_) => None,
248 }
249 }
250
251 pub fn pending(&self) -> bool {
256 match self {
257 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
258 ReplicaLocation::Unmanaged(_) => false,
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
266pub enum ClusterRole {
267 SystemCritical,
270 System,
274 User,
277}
278
279#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
281pub struct UnmanagedReplicaLocation {
282 pub storagectl_addrs: Vec<String>,
285 pub computectl_addrs: Vec<String>,
288}
289
290#[derive(Clone, Debug, PartialEq, Eq)]
292pub enum ManagedReplicaAvailabilityZones {
293 FromCluster(Option<Vec<String>>),
297 FromReplica(Option<String>),
300}
301
302#[derive(Clone, Debug, Serialize, PartialEq)]
304pub struct ManagedReplicaLocation {
305 pub allocation: ReplicaAllocation,
307 pub size: String,
309 pub internal: bool,
311 pub billed_as: Option<String>,
313 #[serde(skip)]
327 pub availability_zones: ManagedReplicaAvailabilityZones,
328 pub pending: bool,
330}
331
332impl ManagedReplicaLocation {
333 pub fn size_for_billing(&self) -> &str {
335 self.billed_as.as_deref().unwrap_or(&self.size)
336 }
337}
338
339pub type ReplicaLogging = ComputeReplicaLogging;
341
342pub type ProcessId = u64;
344
345#[derive(Debug, Clone, Serialize)]
347pub struct ClusterEvent {
348 pub cluster_id: ClusterId,
349 pub replica_id: ReplicaId,
350 pub process_id: ProcessId,
351 pub status: ClusterStatus,
352 pub time: DateTime<Utc>,
353}
354
355impl Controller {
356 pub fn create_cluster(
362 &mut self,
363 id: ClusterId,
364 config: ClusterConfig,
365 ) -> Result<(), anyhow::Error> {
366 self.storage
367 .create_instance(id, config.workload_class.clone());
368 self.compute
369 .create_instance(id, config.arranged_logs, config.workload_class)?;
370 Ok(())
371 }
372
373 pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
379 self.storage
380 .update_instance_workload_class(id, workload_class.clone());
381 self.compute
382 .update_instance_workload_class(id, workload_class)
383 .expect("instance exists");
384 }
385
386 pub fn drop_cluster(&mut self, id: ClusterId) {
392 self.storage.drop_instance(id);
393 self.compute.drop_instance(id);
394 }
395
396 pub fn create_replica(
399 &mut self,
400 cluster_id: ClusterId,
401 replica_id: ReplicaId,
402 cluster_name: String,
403 replica_name: String,
404 role: ClusterRole,
405 config: ReplicaConfig,
406 enable_worker_core_affinity: bool,
407 enable_storage_introspection_logs: bool,
408 ) -> Result<(), anyhow::Error> {
409 let storage_location: ClusterReplicaLocation;
410 let compute_location: ClusterReplicaLocation;
411 let metrics_task: Option<AbortOnDropHandle<()>>;
412
413 match config.location {
414 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
415 storagectl_addrs,
416 computectl_addrs,
417 }) => {
418 compute_location = ClusterReplicaLocation {
419 ctl_addrs: computectl_addrs,
420 };
421 storage_location = ClusterReplicaLocation {
422 ctl_addrs: storagectl_addrs,
423 };
424 metrics_task = None;
425 }
426 ReplicaLocation::Managed(m) => {
427 let (service, metrics_task_join_handle) = self.provision_replica(
428 cluster_id,
429 replica_id,
430 cluster_name,
431 replica_name,
432 role,
433 m,
434 enable_worker_core_affinity,
435 enable_storage_introspection_logs,
436 )?;
437 storage_location = ClusterReplicaLocation {
438 ctl_addrs: service.addresses("storagectl"),
439 };
440 compute_location = ClusterReplicaLocation {
441 ctl_addrs: service.addresses("computectl"),
442 };
443 metrics_task = Some(metrics_task_join_handle);
444
445 let http_addresses = service.addresses("internal-http");
447 self.replica_http_locator
448 .register_replica(cluster_id, replica_id, http_addresses);
449 }
450 }
451
452 self.storage
453 .connect_replica(cluster_id, replica_id, storage_location);
454 self.compute.add_replica_to_instance(
455 cluster_id,
456 replica_id,
457 compute_location,
458 config.compute,
459 )?;
460
461 if let Some(task) = metrics_task {
462 self.metrics_tasks.insert(replica_id, task);
463 }
464
465 Ok(())
466 }
467
468 pub fn drop_replica(
470 &mut self,
471 cluster_id: ClusterId,
472 replica_id: ReplicaId,
473 ) -> Result<(), anyhow::Error> {
474 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
479 self.metrics_tasks.remove(&replica_id);
480
481 self.replica_http_locator
483 .remove_replica(cluster_id, replica_id);
484
485 self.compute.drop_replica(cluster_id, replica_id)?;
486 self.storage.drop_replica(cluster_id, replica_id);
487 Ok(())
488 }
489
490 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
492 let deploy_generation = self.deploy_generation;
493 let dyncfg = Arc::clone(self.compute.dyncfg());
494 let orchestrator = Arc::clone(&self.orchestrator);
495 task::spawn(
496 || "controller_remove_past_generation_replicas",
497 async move {
498 info!("attempting to remove past generation replicas");
499 loop {
500 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
501 .await
502 {
503 Ok(()) => {
504 info!("successfully removed past generation replicas");
505 return;
506 }
507 Err(e) => {
508 let interval =
509 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
510 .get(&dyncfg);
511 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
512 time::sleep(interval).await;
513 }
514 }
515 }
516 },
517 );
518 }
519
520 #[instrument]
522 pub async fn remove_orphaned_replicas(
523 &mut self,
524 next_user_replica_id: u64,
525 next_system_replica_id: u64,
526 ) -> Result<(), anyhow::Error> {
527 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
528
529 let actual: BTreeSet<_> = self
530 .orchestrator
531 .list_services()
532 .await?
533 .iter()
534 .map(|s| ReplicaServiceName::from_str(s))
535 .collect::<Result<_, _>>()?;
536
537 for ReplicaServiceName {
538 cluster_id,
539 replica_id,
540 generation,
541 } in actual
542 {
543 if generation != self.deploy_generation {
547 continue;
548 }
549
550 let smaller_next = match replica_id {
551 ReplicaId::User(id) if id >= next_user_replica_id => {
552 Some(ReplicaId::User(next_user_replica_id))
553 }
554 ReplicaId::System(id) if id >= next_system_replica_id => {
555 Some(ReplicaId::System(next_system_replica_id))
556 }
557 _ => None,
558 };
559 if let Some(next) = smaller_next {
560 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
565 }
566 if !desired.contains(&replica_id) {
567 self.deprovision_replica(cluster_id, replica_id, generation)?;
568 }
569 }
570
571 Ok(())
572 }
573
574 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
575 let deploy_generation = self.deploy_generation;
576
577 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
578 let ReplicaServiceName {
579 cluster_id,
580 replica_id,
581 generation: replica_generation,
582 ..
583 } = event.service_id.parse()?;
584
585 let event = ClusterEvent {
586 cluster_id,
587 replica_id,
588 process_id: event.process_id,
589 status: event.status,
590 time: event.time,
591 };
592
593 Ok((event, replica_generation))
594 }
595
596 let stream = self
597 .orchestrator
598 .watch_services()
599 .map(|event| event.and_then(translate_event))
600 .filter_map(move |event| async move {
601 match event {
602 Ok((event, replica_generation)) => {
603 if replica_generation == deploy_generation {
604 Some(event)
605 } else {
606 None
607 }
608 }
609 Err(error) => {
610 error!("service watch error: {error}");
611 None
612 }
613 }
614 });
615
616 Box::pin(stream)
617 }
618
619 fn provision_replica(
621 &self,
622 cluster_id: ClusterId,
623 replica_id: ReplicaId,
624 cluster_name: String,
625 replica_name: String,
626 role: ClusterRole,
627 location: ManagedReplicaLocation,
628 enable_worker_core_affinity: bool,
629 enable_storage_introspection_logs: bool,
630 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
631 let service_name = ReplicaServiceName {
632 cluster_id,
633 replica_id,
634 generation: self.deploy_generation,
635 }
636 .to_string();
637 let role_label = match role {
638 ClusterRole::SystemCritical => "system-critical",
639 ClusterRole::System => "system",
640 ClusterRole::User => "user",
641 };
642 let environment_id = self.connection_context().environment_id.clone();
643 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
644 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
645 let persist_pubsub_url = self.persist_pubsub_url.clone();
646 let secrets_args = self.secrets_args.to_flags();
647
648 let storage_proto_timely_config = TimelyConfig {
650 arrangement_exert_proportionality: 1337,
651 ..Default::default()
652 };
653 let compute_proto_timely_config = TimelyConfig {
654 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
655 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
656 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
657 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
658 ..Default::default()
659 };
660
661 let mut disk_limit = location.allocation.disk_limit;
662 let memory_limit = location.allocation.memory_limit;
663 let mut memory_request = None;
664
665 if location.allocation.swap_enabled {
666 disk_limit = Some(DiskLimit::ZERO);
670
671 memory_request = memory_limit.map(|MemoryLimit(limit)| {
675 let request = ByteSize::b(limit.as_u64() - 1);
676 MemoryLimit(request)
677 });
678 }
679
680 let service = self.orchestrator.ensure_service(
681 &service_name,
682 ServiceConfig {
683 image: self.clusterd_image.clone(),
684 init_container_image: self.init_container_image.clone(),
685 args: Box::new(move |assigned| {
686 let storage_timely_config = TimelyConfig {
687 workers: location.allocation.workers.get(),
688 addresses: assigned.peer_addresses("storage"),
689 ..storage_proto_timely_config
690 };
691 let compute_timely_config = TimelyConfig {
692 workers: location.allocation.workers.get(),
693 addresses: assigned.peer_addresses("compute"),
694 ..compute_proto_timely_config
695 };
696
697 let mut args = vec![
698 format!(
699 "--storage-controller-listen-addr={}",
700 assigned.listen_addrs["storagectl"]
701 ),
702 format!(
703 "--compute-controller-listen-addr={}",
704 assigned.listen_addrs["computectl"]
705 ),
706 format!(
707 "--internal-http-listen-addr={}",
708 assigned.listen_addrs["internal-http"]
709 ),
710 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
711 format!("--opentelemetry-resource=replica_id={}", replica_id),
712 format!("--persist-pubsub-url={}", persist_pubsub_url),
713 format!("--environment-id={}", environment_id),
714 format!(
715 "--storage-timely-config={}",
716 storage_timely_config.to_string(),
717 ),
718 format!(
719 "--compute-timely-config={}",
720 compute_timely_config.to_string(),
721 ),
722 ];
723 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
724 args.push(format!(
725 "--aws-external-id-prefix={}",
726 aws_external_id_prefix
727 ));
728 }
729 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
730 args.push(format!(
731 "--aws-connection-role-arn={}",
732 aws_connection_role_arn
733 ));
734 }
735 if let Some(memory_limit) = location.allocation.memory_limit {
736 args.push(format!(
737 "--announce-memory-limit={}",
738 memory_limit.0.as_u64()
739 ));
740 }
741 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
742 args.push("--worker-core-affinity".into());
743 }
744 if enable_storage_introspection_logs {
745 args.push("--enable-storage-introspection-logs".into());
746 }
747 if location.allocation.is_cc {
748 args.push("--is-cc".into());
749 }
750
751 if location.allocation.swap_enabled
754 && let Some(memory_limit) = location.allocation.memory_limit
755 && let Some(disk_limit) = location.allocation.disk_limit
756 && disk_limit != DiskLimit::ZERO
760 {
761 let heap_limit = memory_limit.0 + disk_limit.0;
762 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
763 }
764
765 args.extend(secrets_args.clone());
766 args
767 }),
768 ports: vec![
769 ServicePort {
770 name: "storagectl".into(),
771 port_hint: 2100,
772 },
773 ServicePort {
777 name: "storage".into(),
778 port_hint: 2103,
779 },
780 ServicePort {
781 name: "computectl".into(),
782 port_hint: 2101,
783 },
784 ServicePort {
785 name: "compute".into(),
786 port_hint: 2102,
787 },
788 ServicePort {
789 name: "internal-http".into(),
790 port_hint: 6878,
791 },
792 ],
793 cpu_limit: location.allocation.cpu_limit,
794 cpu_request: location.allocation.cpu_request,
795 memory_limit,
796 memory_request,
797 scale: location.allocation.scale,
798 labels: BTreeMap::from([
799 ("replica-id".into(), replica_id.to_string()),
800 ("cluster-id".into(), cluster_id.to_string()),
801 ("type".into(), "cluster".into()),
802 ("replica-role".into(), role_label.into()),
803 ("workers".into(), location.allocation.workers.to_string()),
804 (
805 "size".into(),
806 location
807 .size
808 .to_string()
809 .replace("=", "-")
810 .replace(",", "_"),
811 ),
812 ]),
813 annotations: BTreeMap::from([
814 ("replica-name".into(), replica_name),
815 ("cluster-name".into(), cluster_name),
816 ]),
817 availability_zones: match location.availability_zones {
818 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
819 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
820 },
821 other_replicas_selector: vec![
825 LabelSelector {
826 label_name: "cluster-id".to_string(),
827 logic: LabelSelectionLogic::Eq {
828 value: cluster_id.to_string(),
829 },
830 },
831 LabelSelector {
833 label_name: "replica-id".into(),
834 logic: LabelSelectionLogic::NotEq {
835 value: replica_id.to_string(),
836 },
837 },
838 ],
839 replicas_selector: vec![LabelSelector {
840 label_name: "cluster-id".to_string(),
841 logic: LabelSelectionLogic::Eq {
843 value: cluster_id.to_string(),
844 },
845 }],
846 disk_limit,
847 node_selector: location.allocation.selectors,
848 },
849 )?;
850
851 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
852 let tx = self.metrics_tx.clone();
853 let orchestrator = Arc::clone(&self.orchestrator);
854 let service_name = service_name.clone();
855 async move {
856 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
857
858 let mut interval = tokio::time::interval(METRICS_INTERVAL);
866 loop {
867 interval.tick().await;
868 match orchestrator.fetch_service_metrics(&service_name).await {
869 Ok(metrics) => {
870 let _ = tx.send((replica_id, metrics));
871 }
872 Err(e) => {
873 warn!("failed to get metrics for replica {replica_id}: {e}");
874 }
875 }
876 }
877 }
878 });
879
880 Ok((service, metrics_task.abort_on_drop()))
881 }
882
883 fn deprovision_replica(
885 &self,
886 cluster_id: ClusterId,
887 replica_id: ReplicaId,
888 generation: u64,
889 ) -> Result<(), anyhow::Error> {
890 let service_name = ReplicaServiceName {
891 cluster_id,
892 replica_id,
893 generation,
894 }
895 .to_string();
896 self.orchestrator.drop_service(&service_name)
897 }
898}
899
900async fn try_remove_past_generation_replicas(
902 orchestrator: &dyn NamespacedOrchestrator,
903 deploy_generation: u64,
904) -> Result<(), anyhow::Error> {
905 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
906
907 for service in services {
908 let name: ReplicaServiceName = service.parse()?;
909 if name.generation < deploy_generation {
910 info!(
911 cluster_id = %name.cluster_id,
912 replica_id = %name.replica_id,
913 "removing past generation replica",
914 );
915 orchestrator.drop_service(&service)?;
916 }
917 }
918
919 Ok(())
920}
921
922#[derive(PartialEq, Eq, PartialOrd, Ord)]
924pub struct ReplicaServiceName {
925 pub cluster_id: ClusterId,
926 pub replica_id: ReplicaId,
927 pub generation: u64,
928}
929
930impl fmt::Display for ReplicaServiceName {
931 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
932 let ReplicaServiceName {
933 cluster_id,
934 replica_id,
935 generation,
936 } = self;
937 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
938 }
939}
940
941impl FromStr for ReplicaServiceName {
942 type Err = anyhow::Error;
943
944 fn from_str(s: &str) -> Result<Self, Self::Err> {
945 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
946 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
947 });
948
949 let caps = SERVICE_NAME_RE
950 .captures(s)
951 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
952
953 Ok(ReplicaServiceName {
954 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
955 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
956 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
960 })
961 }
962}