1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36 ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50pub struct ClusterConfig {
52 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57 pub workload_class: Option<String>,
60}
61
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68 pub location: ReplicaLocation,
70 pub compute: ComputeReplicaConfig,
72}
73
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77 pub memory_limit: Option<MemoryLimit>,
79 pub memory_request: Option<MemoryLimit>,
81 pub cpu_limit: Option<CpuLimit>,
83 pub cpu_request: Option<CpuLimit>,
85 pub disk_limit: Option<DiskLimit>,
87 pub scale: NonZero<u16>,
89 pub workers: NonZero<usize>,
91 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
93 pub credits_per_hour: Numeric,
94 #[serde(default)]
96 pub cpu_exclusive: bool,
97 #[serde(default = "default_true")]
100 pub is_cc: bool,
101 #[serde(default)]
103 pub swap_enabled: bool,
104 #[serde(default)]
106 pub disabled: bool,
107 #[serde(default)]
109 pub selectors: BTreeMap<String, String>,
110}
111
112fn default_true() -> bool {
113 true
114}
115
116#[mz_ore::test]
117#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
120 use bytesize::ByteSize;
121 use mz_ore::{assert_err, assert_ok};
122
123 let data = r#"
124 {
125 "cpu_limit": 1.0,
126 "memory_limit": "10GiB",
127 "disk_limit": "100MiB",
128 "scale": 16,
129 "workers": 1,
130 "credits_per_hour": "16",
131 "swap_enabled": true,
132 "selectors": {
133 "key1": "value1",
134 "key2": "value2"
135 }
136 }"#;
137
138 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
139 .expect("deserialization from JSON succeeds for ReplicaAllocation");
140
141 assert_eq!(
142 replica_allocation,
143 ReplicaAllocation {
144 credits_per_hour: 16.into(),
145 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
146 disabled: false,
147 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
148 memory_request: None,
149 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
150 cpu_request: None,
151 cpu_exclusive: false,
152 is_cc: true,
153 swap_enabled: true,
154 scale: NonZero::new(16).unwrap(),
155 workers: NonZero::new(1).unwrap(),
156 selectors: BTreeMap::from([
157 ("key1".to_string(), "value1".to_string()),
158 ("key2".to_string(), "value2".to_string())
159 ]),
160 }
161 );
162
163 let data = r#"
164 {
165 "cpu_limit": 0,
166 "memory_limit": "0GiB",
167 "disk_limit": "0MiB",
168 "scale": 1,
169 "workers": 1,
170 "credits_per_hour": "0",
171 "cpu_exclusive": true,
172 "disabled": true
173 }"#;
174
175 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
176 .expect("deserialization from JSON succeeds for ReplicaAllocation");
177
178 assert_eq!(
179 replica_allocation,
180 ReplicaAllocation {
181 credits_per_hour: 0.into(),
182 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
183 disabled: true,
184 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
185 memory_request: None,
186 cpu_limit: Some(CpuLimit::from_millicpus(0)),
187 cpu_request: None,
188 cpu_exclusive: true,
189 is_cc: true,
190 swap_enabled: false,
191 scale: NonZero::new(1).unwrap(),
192 workers: NonZero::new(1).unwrap(),
193 selectors: Default::default(),
194 }
195 );
196
197 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
199 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
200 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
201 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
202 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
203 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
204}
205
206#[derive(Clone, Debug, Serialize, PartialEq)]
208pub enum ReplicaLocation {
209 Unmanaged(UnmanagedReplicaLocation),
211 Managed(ManagedReplicaLocation),
213}
214
215impl ReplicaLocation {
216 pub fn num_processes(&self) -> usize {
218 match self {
219 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
220 computectl_addrs, ..
221 }) => computectl_addrs.len(),
222 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
223 allocation.scale.cast_into()
224 }
225 }
226 }
227
228 pub fn billed_as(&self) -> Option<&str> {
229 match self {
230 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
231 billed_as.as_deref()
232 }
233 ReplicaLocation::Unmanaged(_) => None,
234 }
235 }
236
237 pub fn internal(&self) -> bool {
238 match self {
239 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
240 ReplicaLocation::Unmanaged(_) => false,
241 }
242 }
243
244 pub fn workers(&self) -> Option<usize> {
248 match self {
249 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
250 Some(allocation.workers.get() * self.num_processes())
251 }
252 ReplicaLocation::Unmanaged(_) => None,
253 }
254 }
255
256 pub fn pending(&self) -> bool {
261 match self {
262 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
263 ReplicaLocation::Unmanaged(_) => false,
264 }
265 }
266}
267
268#[derive(Debug, Clone)]
271pub enum ClusterRole {
272 SystemCritical,
275 System,
279 User,
282}
283
284#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
286pub struct UnmanagedReplicaLocation {
287 pub storagectl_addrs: Vec<String>,
290 pub computectl_addrs: Vec<String>,
293}
294
295#[derive(Clone, Debug, PartialEq, Eq)]
297pub enum ManagedReplicaAvailabilityZones {
298 FromCluster(Option<Vec<String>>),
302 FromReplica(Option<String>),
305}
306
307#[derive(Clone, Debug, Serialize, PartialEq)]
309pub struct ManagedReplicaLocation {
310 pub allocation: ReplicaAllocation,
312 pub size: String,
314 pub internal: bool,
316 pub billed_as: Option<String>,
318 #[serde(skip)]
332 pub availability_zones: ManagedReplicaAvailabilityZones,
333 pub pending: bool,
335}
336
337impl ManagedReplicaLocation {
338 pub fn size_for_billing(&self) -> &str {
340 self.billed_as.as_deref().unwrap_or(&self.size)
341 }
342}
343
344pub type ReplicaLogging = ComputeReplicaLogging;
346
347pub type ProcessId = u64;
349
350#[derive(Debug, Clone, Serialize)]
352pub struct ClusterEvent {
353 pub cluster_id: ClusterId,
354 pub replica_id: ReplicaId,
355 pub process_id: ProcessId,
356 pub status: ClusterStatus,
357 pub time: DateTime<Utc>,
358}
359
360impl<T> Controller<T>
361where
362 T: ComputeControllerTimestamp,
363{
364 pub fn create_cluster(
370 &mut self,
371 id: ClusterId,
372 config: ClusterConfig,
373 ) -> Result<(), anyhow::Error> {
374 self.storage
375 .create_instance(id, config.workload_class.clone());
376 self.compute
377 .create_instance(id, config.arranged_logs, config.workload_class)?;
378 Ok(())
379 }
380
381 pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
387 self.storage
388 .update_instance_workload_class(id, workload_class.clone());
389 self.compute
390 .update_instance_workload_class(id, workload_class)
391 .expect("instance exists");
392 }
393
394 pub fn drop_cluster(&mut self, id: ClusterId) {
400 self.storage.drop_instance(id);
401 self.compute.drop_instance(id);
402 }
403
404 pub fn create_replica(
407 &mut self,
408 cluster_id: ClusterId,
409 replica_id: ReplicaId,
410 cluster_name: String,
411 replica_name: String,
412 role: ClusterRole,
413 config: ReplicaConfig,
414 enable_worker_core_affinity: bool,
415 ) -> Result<(), anyhow::Error> {
416 let storage_location: ClusterReplicaLocation;
417 let compute_location: ClusterReplicaLocation;
418 let metrics_task: Option<AbortOnDropHandle<()>>;
419
420 match config.location {
421 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
422 storagectl_addrs,
423 computectl_addrs,
424 }) => {
425 compute_location = ClusterReplicaLocation {
426 ctl_addrs: computectl_addrs,
427 };
428 storage_location = ClusterReplicaLocation {
429 ctl_addrs: storagectl_addrs,
430 };
431 metrics_task = None;
432 }
433 ReplicaLocation::Managed(m) => {
434 let (service, metrics_task_join_handle) = self.provision_replica(
435 cluster_id,
436 replica_id,
437 cluster_name,
438 replica_name,
439 role,
440 m,
441 enable_worker_core_affinity,
442 )?;
443 storage_location = ClusterReplicaLocation {
444 ctl_addrs: service.addresses("storagectl"),
445 };
446 compute_location = ClusterReplicaLocation {
447 ctl_addrs: service.addresses("computectl"),
448 };
449 metrics_task = Some(metrics_task_join_handle);
450
451 let http_addresses = service.addresses("internal-http");
453 self.replica_http_locator
454 .register_replica(cluster_id, replica_id, http_addresses);
455 }
456 }
457
458 self.storage
459 .connect_replica(cluster_id, replica_id, storage_location);
460 self.compute.add_replica_to_instance(
461 cluster_id,
462 replica_id,
463 compute_location,
464 config.compute,
465 )?;
466
467 if let Some(task) = metrics_task {
468 self.metrics_tasks.insert(replica_id, task);
469 }
470
471 Ok(())
472 }
473
474 pub fn drop_replica(
476 &mut self,
477 cluster_id: ClusterId,
478 replica_id: ReplicaId,
479 ) -> Result<(), anyhow::Error> {
480 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
485 self.metrics_tasks.remove(&replica_id);
486
487 self.replica_http_locator
489 .remove_replica(cluster_id, replica_id);
490
491 self.compute.drop_replica(cluster_id, replica_id)?;
492 self.storage.drop_replica(cluster_id, replica_id);
493 Ok(())
494 }
495
496 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
498 let deploy_generation = self.deploy_generation;
499 let dyncfg = Arc::clone(self.compute.dyncfg());
500 let orchestrator = Arc::clone(&self.orchestrator);
501 task::spawn(
502 || "controller_remove_past_generation_replicas",
503 async move {
504 info!("attempting to remove past generation replicas");
505 loop {
506 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
507 .await
508 {
509 Ok(()) => {
510 info!("successfully removed past generation replicas");
511 return;
512 }
513 Err(e) => {
514 let interval =
515 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
516 .get(&dyncfg);
517 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
518 time::sleep(interval).await;
519 }
520 }
521 }
522 },
523 );
524 }
525
526 #[instrument]
528 pub async fn remove_orphaned_replicas(
529 &mut self,
530 next_user_replica_id: u64,
531 next_system_replica_id: u64,
532 ) -> Result<(), anyhow::Error> {
533 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
534
535 let actual: BTreeSet<_> = self
536 .orchestrator
537 .list_services()
538 .await?
539 .iter()
540 .map(|s| ReplicaServiceName::from_str(s))
541 .collect::<Result<_, _>>()?;
542
543 for ReplicaServiceName {
544 cluster_id,
545 replica_id,
546 generation,
547 } in actual
548 {
549 if generation != self.deploy_generation {
553 continue;
554 }
555
556 let smaller_next = match replica_id {
557 ReplicaId::User(id) if id >= next_user_replica_id => {
558 Some(ReplicaId::User(next_user_replica_id))
559 }
560 ReplicaId::System(id) if id >= next_system_replica_id => {
561 Some(ReplicaId::System(next_system_replica_id))
562 }
563 _ => None,
564 };
565 if let Some(next) = smaller_next {
566 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
571 }
572 if !desired.contains(&replica_id) {
573 self.deprovision_replica(cluster_id, replica_id, generation)?;
574 }
575 }
576
577 Ok(())
578 }
579
580 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
581 let deploy_generation = self.deploy_generation;
582
583 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
584 let ReplicaServiceName {
585 cluster_id,
586 replica_id,
587 generation: replica_generation,
588 ..
589 } = event.service_id.parse()?;
590
591 let event = ClusterEvent {
592 cluster_id,
593 replica_id,
594 process_id: event.process_id,
595 status: event.status,
596 time: event.time,
597 };
598
599 Ok((event, replica_generation))
600 }
601
602 let stream = self
603 .orchestrator
604 .watch_services()
605 .map(|event| event.and_then(translate_event))
606 .filter_map(move |event| async move {
607 match event {
608 Ok((event, replica_generation)) => {
609 if replica_generation == deploy_generation {
610 Some(event)
611 } else {
612 None
613 }
614 }
615 Err(error) => {
616 error!("service watch error: {error}");
617 None
618 }
619 }
620 });
621
622 Box::pin(stream)
623 }
624
625 fn provision_replica(
627 &self,
628 cluster_id: ClusterId,
629 replica_id: ReplicaId,
630 cluster_name: String,
631 replica_name: String,
632 role: ClusterRole,
633 location: ManagedReplicaLocation,
634 enable_worker_core_affinity: bool,
635 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
636 let service_name = ReplicaServiceName {
637 cluster_id,
638 replica_id,
639 generation: self.deploy_generation,
640 }
641 .to_string();
642 let role_label = match role {
643 ClusterRole::SystemCritical => "system-critical",
644 ClusterRole::System => "system",
645 ClusterRole::User => "user",
646 };
647 let environment_id = self.connection_context().environment_id.clone();
648 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
649 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
650 let persist_pubsub_url = self.persist_pubsub_url.clone();
651 let secrets_args = self.secrets_args.to_flags();
652
653 let storage_proto_timely_config = TimelyConfig {
655 arrangement_exert_proportionality: 1337,
656 ..Default::default()
657 };
658 let compute_proto_timely_config = TimelyConfig {
659 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
660 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
661 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
662 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
663 ..Default::default()
664 };
665
666 let mut disk_limit = location.allocation.disk_limit;
667 let memory_limit = location.allocation.memory_limit;
668 let mut memory_request = None;
669
670 if location.allocation.swap_enabled {
671 disk_limit = Some(DiskLimit::ZERO);
675
676 memory_request = memory_limit.map(|MemoryLimit(limit)| {
680 let request = ByteSize::b(limit.as_u64() - 1);
681 MemoryLimit(request)
682 });
683 }
684
685 let service = self.orchestrator.ensure_service(
686 &service_name,
687 ServiceConfig {
688 image: self.clusterd_image.clone(),
689 init_container_image: self.init_container_image.clone(),
690 args: Box::new(move |assigned| {
691 let storage_timely_config = TimelyConfig {
692 workers: location.allocation.workers.get(),
693 addresses: assigned.peer_addresses("storage"),
694 ..storage_proto_timely_config
695 };
696 let compute_timely_config = TimelyConfig {
697 workers: location.allocation.workers.get(),
698 addresses: assigned.peer_addresses("compute"),
699 ..compute_proto_timely_config
700 };
701
702 let mut args = vec![
703 format!(
704 "--storage-controller-listen-addr={}",
705 assigned.listen_addrs["storagectl"]
706 ),
707 format!(
708 "--compute-controller-listen-addr={}",
709 assigned.listen_addrs["computectl"]
710 ),
711 format!(
712 "--internal-http-listen-addr={}",
713 assigned.listen_addrs["internal-http"]
714 ),
715 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
716 format!("--opentelemetry-resource=replica_id={}", replica_id),
717 format!("--persist-pubsub-url={}", persist_pubsub_url),
718 format!("--environment-id={}", environment_id),
719 format!(
720 "--storage-timely-config={}",
721 storage_timely_config.to_string(),
722 ),
723 format!(
724 "--compute-timely-config={}",
725 compute_timely_config.to_string(),
726 ),
727 ];
728 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
729 args.push(format!(
730 "--aws-external-id-prefix={}",
731 aws_external_id_prefix
732 ));
733 }
734 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
735 args.push(format!(
736 "--aws-connection-role-arn={}",
737 aws_connection_role_arn
738 ));
739 }
740 if let Some(memory_limit) = location.allocation.memory_limit {
741 args.push(format!(
742 "--announce-memory-limit={}",
743 memory_limit.0.as_u64()
744 ));
745 }
746 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
747 args.push("--worker-core-affinity".into());
748 }
749 if location.allocation.is_cc {
750 args.push("--is-cc".into());
751 }
752
753 if location.allocation.swap_enabled
756 && let Some(memory_limit) = location.allocation.memory_limit
757 && let Some(disk_limit) = location.allocation.disk_limit
758 && disk_limit != DiskLimit::ZERO
762 {
763 let heap_limit = memory_limit.0 + disk_limit.0;
764 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
765 }
766
767 args.extend(secrets_args.clone());
768 args
769 }),
770 ports: vec![
771 ServicePort {
772 name: "storagectl".into(),
773 port_hint: 2100,
774 },
775 ServicePort {
779 name: "storage".into(),
780 port_hint: 2103,
781 },
782 ServicePort {
783 name: "computectl".into(),
784 port_hint: 2101,
785 },
786 ServicePort {
787 name: "compute".into(),
788 port_hint: 2102,
789 },
790 ServicePort {
791 name: "internal-http".into(),
792 port_hint: 6878,
793 },
794 ],
795 cpu_limit: location.allocation.cpu_limit,
796 cpu_request: location.allocation.cpu_request,
797 memory_limit,
798 memory_request,
799 scale: location.allocation.scale,
800 labels: BTreeMap::from([
801 ("replica-id".into(), replica_id.to_string()),
802 ("cluster-id".into(), cluster_id.to_string()),
803 ("type".into(), "cluster".into()),
804 ("replica-role".into(), role_label.into()),
805 ("workers".into(), location.allocation.workers.to_string()),
806 (
807 "size".into(),
808 location
809 .size
810 .to_string()
811 .replace("=", "-")
812 .replace(",", "_"),
813 ),
814 ]),
815 annotations: BTreeMap::from([
816 ("replica-name".into(), replica_name),
817 ("cluster-name".into(), cluster_name),
818 ]),
819 availability_zones: match location.availability_zones {
820 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
821 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
822 },
823 other_replicas_selector: vec![
827 LabelSelector {
828 label_name: "cluster-id".to_string(),
829 logic: LabelSelectionLogic::Eq {
830 value: cluster_id.to_string(),
831 },
832 },
833 LabelSelector {
835 label_name: "replica-id".into(),
836 logic: LabelSelectionLogic::NotEq {
837 value: replica_id.to_string(),
838 },
839 },
840 ],
841 replicas_selector: vec![LabelSelector {
842 label_name: "cluster-id".to_string(),
843 logic: LabelSelectionLogic::Eq {
845 value: cluster_id.to_string(),
846 },
847 }],
848 disk_limit,
849 node_selector: location.allocation.selectors,
850 },
851 )?;
852
853 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
854 let tx = self.metrics_tx.clone();
855 let orchestrator = Arc::clone(&self.orchestrator);
856 let service_name = service_name.clone();
857 async move {
858 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
859
860 let mut interval = tokio::time::interval(METRICS_INTERVAL);
868 loop {
869 interval.tick().await;
870 match orchestrator.fetch_service_metrics(&service_name).await {
871 Ok(metrics) => {
872 let _ = tx.send((replica_id, metrics));
873 }
874 Err(e) => {
875 warn!("failed to get metrics for replica {replica_id}: {e}");
876 }
877 }
878 }
879 }
880 });
881
882 Ok((service, metrics_task.abort_on_drop()))
883 }
884
885 fn deprovision_replica(
887 &self,
888 cluster_id: ClusterId,
889 replica_id: ReplicaId,
890 generation: u64,
891 ) -> Result<(), anyhow::Error> {
892 let service_name = ReplicaServiceName {
893 cluster_id,
894 replica_id,
895 generation,
896 }
897 .to_string();
898 self.orchestrator.drop_service(&service_name)
899 }
900}
901
902async fn try_remove_past_generation_replicas(
904 orchestrator: &dyn NamespacedOrchestrator,
905 deploy_generation: u64,
906) -> Result<(), anyhow::Error> {
907 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
908
909 for service in services {
910 let name: ReplicaServiceName = service.parse()?;
911 if name.generation < deploy_generation {
912 info!(
913 cluster_id = %name.cluster_id,
914 replica_id = %name.replica_id,
915 "removing past generation replica",
916 );
917 orchestrator.drop_service(&service)?;
918 }
919 }
920
921 Ok(())
922}
923
924#[derive(PartialEq, Eq, PartialOrd, Ord)]
926pub struct ReplicaServiceName {
927 pub cluster_id: ClusterId,
928 pub replica_id: ReplicaId,
929 pub generation: u64,
930}
931
932impl fmt::Display for ReplicaServiceName {
933 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
934 let ReplicaServiceName {
935 cluster_id,
936 replica_id,
937 generation,
938 } = self;
939 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
940 }
941}
942
943impl FromStr for ReplicaServiceName {
944 type Err = anyhow::Error;
945
946 fn from_str(s: &str) -> Result<Self, Self::Err> {
947 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
948 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
949 });
950
951 let caps = SERVICE_NAME_RE
952 .captures(s)
953 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
954
955 Ok(ReplicaServiceName {
956 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
957 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
958 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
962 })
963 }
964}