1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36 ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50pub struct ClusterConfig {
52 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57 pub workload_class: Option<String>,
60}
61
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68 pub location: ReplicaLocation,
70 pub compute: ComputeReplicaConfig,
72}
73
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77 pub memory_limit: Option<MemoryLimit>,
79 pub cpu_limit: Option<CpuLimit>,
81 pub disk_limit: Option<DiskLimit>,
83 pub scale: NonZero<u16>,
85 pub workers: NonZero<usize>,
87 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
89 pub credits_per_hour: Numeric,
90 #[serde(default)]
92 pub cpu_exclusive: bool,
93 #[serde(default = "default_true")]
96 pub is_cc: bool,
97 #[serde(default)]
99 pub swap_enabled: bool,
100 #[serde(default)]
102 pub disabled: bool,
103 #[serde(default)]
105 pub selectors: BTreeMap<String, String>,
106}
107
108fn default_true() -> bool {
109 true
110}
111
112#[mz_ore::test]
113#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
116 use bytesize::ByteSize;
117 use mz_ore::{assert_err, assert_ok};
118
119 let data = r#"
120 {
121 "cpu_limit": 1.0,
122 "memory_limit": "10GiB",
123 "disk_limit": "100MiB",
124 "scale": 16,
125 "workers": 1,
126 "credits_per_hour": "16",
127 "swap_enabled": true,
128 "selectors": {
129 "key1": "value1",
130 "key2": "value2"
131 }
132 }"#;
133
134 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
135 .expect("deserialization from JSON succeeds for ReplicaAllocation");
136
137 assert_eq!(
138 replica_allocation,
139 ReplicaAllocation {
140 credits_per_hour: 16.into(),
141 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
142 disabled: false,
143 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
144 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
145 cpu_exclusive: false,
146 is_cc: true,
147 swap_enabled: true,
148 scale: NonZero::new(16).unwrap(),
149 workers: NonZero::new(1).unwrap(),
150 selectors: BTreeMap::from([
151 ("key1".to_string(), "value1".to_string()),
152 ("key2".to_string(), "value2".to_string())
153 ]),
154 }
155 );
156
157 let data = r#"
158 {
159 "cpu_limit": 0,
160 "memory_limit": "0GiB",
161 "disk_limit": "0MiB",
162 "scale": 1,
163 "workers": 1,
164 "credits_per_hour": "0",
165 "cpu_exclusive": true,
166 "disabled": true
167 }"#;
168
169 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
170 .expect("deserialization from JSON succeeds for ReplicaAllocation");
171
172 assert_eq!(
173 replica_allocation,
174 ReplicaAllocation {
175 credits_per_hour: 0.into(),
176 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
177 disabled: true,
178 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
179 cpu_limit: Some(CpuLimit::from_millicpus(0)),
180 cpu_exclusive: true,
181 is_cc: true,
182 swap_enabled: false,
183 scale: NonZero::new(1).unwrap(),
184 workers: NonZero::new(1).unwrap(),
185 selectors: Default::default(),
186 }
187 );
188
189 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
191 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
192 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
193 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
194 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
195 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
196}
197
198#[derive(Clone, Debug, Serialize, PartialEq)]
200pub enum ReplicaLocation {
201 Unmanaged(UnmanagedReplicaLocation),
203 Managed(ManagedReplicaLocation),
205}
206
207impl ReplicaLocation {
208 pub fn num_processes(&self) -> usize {
210 match self {
211 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
212 computectl_addrs, ..
213 }) => computectl_addrs.len(),
214 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
215 allocation.scale.cast_into()
216 }
217 }
218 }
219
220 pub fn billed_as(&self) -> Option<&str> {
221 match self {
222 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
223 billed_as.as_deref()
224 }
225 _ => None,
226 }
227 }
228
229 pub fn internal(&self) -> bool {
230 match self {
231 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
232 ReplicaLocation::Unmanaged(_) => false,
233 }
234 }
235
236 pub fn workers(&self) -> Option<usize> {
240 match self {
241 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
242 Some(allocation.workers.get() * self.num_processes())
243 }
244 ReplicaLocation::Unmanaged(_) => None,
245 }
246 }
247
248 pub fn pending(&self) -> bool {
253 match self {
254 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
255 _ => false,
256 }
257 }
258}
259
260pub enum ClusterRole {
263 SystemCritical,
266 System,
270 User,
273}
274
275#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
277pub struct UnmanagedReplicaLocation {
278 pub storagectl_addrs: Vec<String>,
281 pub computectl_addrs: Vec<String>,
284}
285
286#[derive(Clone, Debug, PartialEq, Eq)]
288pub enum ManagedReplicaAvailabilityZones {
289 FromCluster(Option<Vec<String>>),
293 FromReplica(Option<String>),
296}
297
298#[derive(Clone, Debug, Serialize, PartialEq)]
300pub struct ManagedReplicaLocation {
301 pub allocation: ReplicaAllocation,
303 pub size: String,
305 pub internal: bool,
307 pub billed_as: Option<String>,
309 #[serde(skip)]
323 pub availability_zones: ManagedReplicaAvailabilityZones,
324 pub pending: bool,
326}
327
328impl ManagedReplicaLocation {
329 pub fn size_for_billing(&self) -> &str {
331 self.billed_as.as_deref().unwrap_or(&self.size)
332 }
333}
334
335pub type ReplicaLogging = ComputeReplicaLogging;
337
338pub type ProcessId = u64;
340
341#[derive(Debug, Clone, Serialize)]
343pub struct ClusterEvent {
344 pub cluster_id: ClusterId,
345 pub replica_id: ReplicaId,
346 pub process_id: ProcessId,
347 pub status: ClusterStatus,
348 pub time: DateTime<Utc>,
349}
350
351impl<T> Controller<T>
352where
353 T: ComputeControllerTimestamp,
354{
355 pub fn create_cluster(
361 &mut self,
362 id: ClusterId,
363 config: ClusterConfig,
364 ) -> Result<(), anyhow::Error> {
365 self.storage
366 .create_instance(id, config.workload_class.clone());
367 self.compute
368 .create_instance(id, config.arranged_logs, config.workload_class)?;
369 Ok(())
370 }
371
372 pub fn update_cluster_workload_class(
374 &mut self,
375 id: ClusterId,
376 workload_class: Option<String>,
377 ) -> Result<(), anyhow::Error> {
378 self.storage
379 .update_instance_workload_class(id, workload_class.clone());
380 self.compute
381 .update_instance_workload_class(id, workload_class)?;
382 Ok(())
383 }
384
385 pub fn drop_cluster(&mut self, id: ClusterId) {
391 self.storage.drop_instance(id);
392 self.compute.drop_instance(id);
393 }
394
395 pub fn create_replica(
398 &mut self,
399 cluster_id: ClusterId,
400 replica_id: ReplicaId,
401 cluster_name: String,
402 replica_name: String,
403 role: ClusterRole,
404 config: ReplicaConfig,
405 enable_worker_core_affinity: bool,
406 ) -> Result<(), anyhow::Error> {
407 let storage_location: ClusterReplicaLocation;
408 let compute_location: ClusterReplicaLocation;
409 let metrics_task: Option<AbortOnDropHandle<()>>;
410
411 match config.location {
412 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
413 storagectl_addrs,
414 computectl_addrs,
415 }) => {
416 compute_location = ClusterReplicaLocation {
417 ctl_addrs: computectl_addrs,
418 };
419 storage_location = ClusterReplicaLocation {
420 ctl_addrs: storagectl_addrs,
421 };
422 metrics_task = None;
423 }
424 ReplicaLocation::Managed(m) => {
425 let (service, metrics_task_join_handle) = self.provision_replica(
426 cluster_id,
427 replica_id,
428 cluster_name,
429 replica_name,
430 role,
431 m,
432 enable_worker_core_affinity,
433 )?;
434 storage_location = ClusterReplicaLocation {
435 ctl_addrs: service.addresses("storagectl"),
436 };
437 compute_location = ClusterReplicaLocation {
438 ctl_addrs: service.addresses("computectl"),
439 };
440 metrics_task = Some(metrics_task_join_handle);
441
442 self.replica_http_locator.register_replica(
447 cluster_id,
448 replica_id,
449 Arc::from(service),
450 );
451 }
452 }
453
454 self.storage
455 .connect_replica(cluster_id, replica_id, storage_location);
456 self.compute.add_replica_to_instance(
457 cluster_id,
458 replica_id,
459 compute_location,
460 config.compute,
461 )?;
462
463 if let Some(task) = metrics_task {
464 self.metrics_tasks.insert(replica_id, task);
465 }
466
467 Ok(())
468 }
469
470 pub fn drop_replica(
472 &mut self,
473 cluster_id: ClusterId,
474 replica_id: ReplicaId,
475 ) -> Result<(), anyhow::Error> {
476 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
481 self.metrics_tasks.remove(&replica_id);
482
483 self.replica_http_locator
485 .remove_replica(cluster_id, replica_id);
486
487 self.compute.drop_replica(cluster_id, replica_id)?;
488 self.storage.drop_replica(cluster_id, replica_id);
489 Ok(())
490 }
491
492 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
494 let deploy_generation = self.deploy_generation;
495 let dyncfg = Arc::clone(self.compute.dyncfg());
496 let orchestrator = Arc::clone(&self.orchestrator);
497 task::spawn(
498 || "controller_remove_past_generation_replicas",
499 async move {
500 info!("attempting to remove past generation replicas");
501 loop {
502 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
503 .await
504 {
505 Ok(()) => {
506 info!("successfully removed past generation replicas");
507 return;
508 }
509 Err(e) => {
510 let interval =
511 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
512 .get(&dyncfg);
513 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
514 time::sleep(interval).await;
515 }
516 }
517 }
518 },
519 );
520 }
521
522 #[instrument]
524 pub async fn remove_orphaned_replicas(
525 &mut self,
526 next_user_replica_id: u64,
527 next_system_replica_id: u64,
528 ) -> Result<(), anyhow::Error> {
529 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
530
531 let actual: BTreeSet<_> = self
532 .orchestrator
533 .list_services()
534 .await?
535 .iter()
536 .map(|s| ReplicaServiceName::from_str(s))
537 .collect::<Result<_, _>>()?;
538
539 for ReplicaServiceName {
540 cluster_id,
541 replica_id,
542 generation,
543 } in actual
544 {
545 if generation != self.deploy_generation {
549 continue;
550 }
551
552 let smaller_next = match replica_id {
553 ReplicaId::User(id) if id >= next_user_replica_id => {
554 Some(ReplicaId::User(next_user_replica_id))
555 }
556 ReplicaId::System(id) if id >= next_system_replica_id => {
557 Some(ReplicaId::System(next_system_replica_id))
558 }
559 _ => None,
560 };
561 if let Some(next) = smaller_next {
562 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
567 }
568 if !desired.contains(&replica_id) {
569 self.deprovision_replica(cluster_id, replica_id, generation)?;
570 }
571 }
572
573 Ok(())
574 }
575
576 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
577 let deploy_generation = self.deploy_generation;
578
579 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
580 let ReplicaServiceName {
581 cluster_id,
582 replica_id,
583 generation: replica_generation,
584 ..
585 } = event.service_id.parse()?;
586
587 let event = ClusterEvent {
588 cluster_id,
589 replica_id,
590 process_id: event.process_id,
591 status: event.status,
592 time: event.time,
593 };
594
595 Ok((event, replica_generation))
596 }
597
598 let stream = self
599 .orchestrator
600 .watch_services()
601 .map(|event| event.and_then(translate_event))
602 .filter_map(move |event| async move {
603 match event {
604 Ok((event, replica_generation)) => {
605 if replica_generation == deploy_generation {
606 Some(event)
607 } else {
608 None
609 }
610 }
611 Err(error) => {
612 error!("service watch error: {error}");
613 None
614 }
615 }
616 });
617
618 Box::pin(stream)
619 }
620
621 fn provision_replica(
623 &self,
624 cluster_id: ClusterId,
625 replica_id: ReplicaId,
626 cluster_name: String,
627 replica_name: String,
628 role: ClusterRole,
629 location: ManagedReplicaLocation,
630 enable_worker_core_affinity: bool,
631 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
632 let service_name = ReplicaServiceName {
633 cluster_id,
634 replica_id,
635 generation: self.deploy_generation,
636 }
637 .to_string();
638 let role_label = match role {
639 ClusterRole::SystemCritical => "system-critical",
640 ClusterRole::System => "system",
641 ClusterRole::User => "user",
642 };
643 let environment_id = self.connection_context().environment_id.clone();
644 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
645 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
646 let persist_pubsub_url = self.persist_pubsub_url.clone();
647 let secrets_args = self.secrets_args.to_flags();
648
649 let storage_proto_timely_config = TimelyConfig {
651 arrangement_exert_proportionality: 1337,
652 ..Default::default()
653 };
654 let compute_proto_timely_config = TimelyConfig {
655 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
656 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
657 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
658 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
659 ..Default::default()
660 };
661
662 let mut disk_limit = location.allocation.disk_limit;
663 let memory_limit = location.allocation.memory_limit;
664 let mut memory_request = None;
665
666 if location.allocation.swap_enabled {
667 disk_limit = Some(DiskLimit::ZERO);
671
672 memory_request = memory_limit.map(|MemoryLimit(limit)| {
676 let request = ByteSize::b(limit.as_u64() - 1);
677 MemoryLimit(request)
678 });
679 }
680
681 let service = self.orchestrator.ensure_service(
682 &service_name,
683 ServiceConfig {
684 image: self.clusterd_image.clone(),
685 init_container_image: self.init_container_image.clone(),
686 args: Box::new(move |assigned| {
687 let storage_timely_config = TimelyConfig {
688 workers: location.allocation.workers.get(),
689 addresses: assigned.peer_addresses("storage"),
690 ..storage_proto_timely_config
691 };
692 let compute_timely_config = TimelyConfig {
693 workers: location.allocation.workers.get(),
694 addresses: assigned.peer_addresses("compute"),
695 ..compute_proto_timely_config
696 };
697
698 let mut args = vec![
699 format!(
700 "--storage-controller-listen-addr={}",
701 assigned.listen_addrs["storagectl"]
702 ),
703 format!(
704 "--compute-controller-listen-addr={}",
705 assigned.listen_addrs["computectl"]
706 ),
707 format!(
708 "--internal-http-listen-addr={}",
709 assigned.listen_addrs["internal-http"]
710 ),
711 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
712 format!("--opentelemetry-resource=replica_id={}", replica_id),
713 format!("--persist-pubsub-url={}", persist_pubsub_url),
714 format!("--environment-id={}", environment_id),
715 format!(
716 "--storage-timely-config={}",
717 storage_timely_config.to_string(),
718 ),
719 format!(
720 "--compute-timely-config={}",
721 compute_timely_config.to_string(),
722 ),
723 ];
724 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
725 args.push(format!(
726 "--aws-external-id-prefix={}",
727 aws_external_id_prefix
728 ));
729 }
730 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
731 args.push(format!(
732 "--aws-connection-role-arn={}",
733 aws_connection_role_arn
734 ));
735 }
736 if let Some(memory_limit) = location.allocation.memory_limit {
737 args.push(format!(
738 "--announce-memory-limit={}",
739 memory_limit.0.as_u64()
740 ));
741 }
742 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
743 args.push("--worker-core-affinity".into());
744 }
745 if location.allocation.is_cc {
746 args.push("--is-cc".into());
747 }
748
749 if location.allocation.swap_enabled
752 && let Some(memory_limit) = location.allocation.memory_limit
753 && let Some(disk_limit) = location.allocation.disk_limit
754 && disk_limit != DiskLimit::ZERO
758 {
759 let heap_limit = memory_limit.0 + disk_limit.0;
760 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
761 }
762
763 args.extend(secrets_args.clone());
764 args
765 }),
766 ports: vec![
767 ServicePort {
768 name: "storagectl".into(),
769 port_hint: 2100,
770 },
771 ServicePort {
775 name: "storage".into(),
776 port_hint: 2103,
777 },
778 ServicePort {
779 name: "computectl".into(),
780 port_hint: 2101,
781 },
782 ServicePort {
783 name: "compute".into(),
784 port_hint: 2102,
785 },
786 ServicePort {
787 name: "internal-http".into(),
788 port_hint: 6878,
789 },
790 ],
791 cpu_limit: location.allocation.cpu_limit,
792 memory_limit,
793 memory_request,
794 scale: location.allocation.scale,
795 labels: BTreeMap::from([
796 ("replica-id".into(), replica_id.to_string()),
797 ("cluster-id".into(), cluster_id.to_string()),
798 ("type".into(), "cluster".into()),
799 ("replica-role".into(), role_label.into()),
800 ("workers".into(), location.allocation.workers.to_string()),
801 (
802 "size".into(),
803 location
804 .size
805 .to_string()
806 .replace("=", "-")
807 .replace(",", "_"),
808 ),
809 ]),
810 annotations: BTreeMap::from([
811 ("replica-name".into(), replica_name),
812 ("cluster-name".into(), cluster_name),
813 ]),
814 availability_zones: match location.availability_zones {
815 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
816 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
817 },
818 other_replicas_selector: vec![
822 LabelSelector {
823 label_name: "cluster-id".to_string(),
824 logic: LabelSelectionLogic::Eq {
825 value: cluster_id.to_string(),
826 },
827 },
828 LabelSelector {
830 label_name: "replica-id".into(),
831 logic: LabelSelectionLogic::NotEq {
832 value: replica_id.to_string(),
833 },
834 },
835 ],
836 replicas_selector: vec![LabelSelector {
837 label_name: "cluster-id".to_string(),
838 logic: LabelSelectionLogic::Eq {
840 value: cluster_id.to_string(),
841 },
842 }],
843 disk_limit,
844 node_selector: location.allocation.selectors,
845 },
846 )?;
847
848 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
849 let tx = self.metrics_tx.clone();
850 let orchestrator = Arc::clone(&self.orchestrator);
851 let service_name = service_name.clone();
852 async move {
853 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
854
855 let mut interval = tokio::time::interval(METRICS_INTERVAL);
863 loop {
864 interval.tick().await;
865 match orchestrator.fetch_service_metrics(&service_name).await {
866 Ok(metrics) => {
867 let _ = tx.send((replica_id, metrics));
868 }
869 Err(e) => {
870 warn!("failed to get metrics for replica {replica_id}: {e}");
871 }
872 }
873 }
874 }
875 });
876
877 Ok((service, metrics_task.abort_on_drop()))
878 }
879
880 fn deprovision_replica(
882 &self,
883 cluster_id: ClusterId,
884 replica_id: ReplicaId,
885 generation: u64,
886 ) -> Result<(), anyhow::Error> {
887 let service_name = ReplicaServiceName {
888 cluster_id,
889 replica_id,
890 generation,
891 }
892 .to_string();
893 self.orchestrator.drop_service(&service_name)
894 }
895}
896
897async fn try_remove_past_generation_replicas(
899 orchestrator: &dyn NamespacedOrchestrator,
900 deploy_generation: u64,
901) -> Result<(), anyhow::Error> {
902 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
903
904 for service in services {
905 let name: ReplicaServiceName = service.parse()?;
906 if name.generation < deploy_generation {
907 info!(
908 cluster_id = %name.cluster_id,
909 replica_id = %name.replica_id,
910 "removing past generation replica",
911 );
912 orchestrator.drop_service(&service)?;
913 }
914 }
915
916 Ok(())
917}
918
919#[derive(PartialEq, Eq, PartialOrd, Ord)]
921pub struct ReplicaServiceName {
922 pub cluster_id: ClusterId,
923 pub replica_id: ReplicaId,
924 pub generation: u64,
925}
926
927impl fmt::Display for ReplicaServiceName {
928 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
929 let ReplicaServiceName {
930 cluster_id,
931 replica_id,
932 generation,
933 } = self;
934 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
935 }
936}
937
938impl FromStr for ReplicaServiceName {
939 type Err = anyhow::Error;
940
941 fn from_str(s: &str) -> Result<Self, Self::Err> {
942 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
943 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
944 });
945
946 let caps = SERVICE_NAME_RE
947 .captures(s)
948 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
949
950 Ok(ReplicaServiceName {
951 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
952 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
953 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
957 })
958 }
959}