1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36 ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50pub struct ClusterConfig {
52 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57 pub workload_class: Option<String>,
60}
61
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68 pub location: ReplicaLocation,
70 pub compute: ComputeReplicaConfig,
72}
73
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77 pub memory_limit: Option<MemoryLimit>,
79 pub memory_request: Option<MemoryLimit>,
81 pub cpu_limit: Option<CpuLimit>,
83 pub cpu_request: Option<CpuLimit>,
85 pub disk_limit: Option<DiskLimit>,
87 pub scale: NonZero<u16>,
89 pub workers: NonZero<usize>,
91 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
93 pub credits_per_hour: Numeric,
94 #[serde(default)]
96 pub cpu_exclusive: bool,
97 #[serde(default = "default_true")]
100 pub is_cc: bool,
101 #[serde(default)]
103 pub swap_enabled: bool,
104 #[serde(default)]
106 pub disabled: bool,
107 #[serde(default)]
109 pub selectors: BTreeMap<String, String>,
110}
111
112fn default_true() -> bool {
113 true
114}
115
116#[mz_ore::test]
117#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
120 use bytesize::ByteSize;
121 use mz_ore::{assert_err, assert_ok};
122
123 let data = r#"
124 {
125 "cpu_limit": 1.0,
126 "memory_limit": "10GiB",
127 "disk_limit": "100MiB",
128 "scale": 16,
129 "workers": 1,
130 "credits_per_hour": "16",
131 "swap_enabled": true,
132 "selectors": {
133 "key1": "value1",
134 "key2": "value2"
135 }
136 }"#;
137
138 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
139 .expect("deserialization from JSON succeeds for ReplicaAllocation");
140
141 assert_eq!(
142 replica_allocation,
143 ReplicaAllocation {
144 credits_per_hour: 16.into(),
145 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
146 disabled: false,
147 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
148 memory_request: None,
149 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
150 cpu_request: None,
151 cpu_exclusive: false,
152 is_cc: true,
153 swap_enabled: true,
154 scale: NonZero::new(16).unwrap(),
155 workers: NonZero::new(1).unwrap(),
156 selectors: BTreeMap::from([
157 ("key1".to_string(), "value1".to_string()),
158 ("key2".to_string(), "value2".to_string())
159 ]),
160 }
161 );
162
163 let data = r#"
164 {
165 "cpu_limit": 0,
166 "memory_limit": "0GiB",
167 "disk_limit": "0MiB",
168 "scale": 1,
169 "workers": 1,
170 "credits_per_hour": "0",
171 "cpu_exclusive": true,
172 "disabled": true
173 }"#;
174
175 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
176 .expect("deserialization from JSON succeeds for ReplicaAllocation");
177
178 assert_eq!(
179 replica_allocation,
180 ReplicaAllocation {
181 credits_per_hour: 0.into(),
182 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
183 disabled: true,
184 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
185 memory_request: None,
186 cpu_limit: Some(CpuLimit::from_millicpus(0)),
187 cpu_request: None,
188 cpu_exclusive: true,
189 is_cc: true,
190 swap_enabled: false,
191 scale: NonZero::new(1).unwrap(),
192 workers: NonZero::new(1).unwrap(),
193 selectors: Default::default(),
194 }
195 );
196
197 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
199 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
200 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
201 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
202 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
203 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
204}
205
206#[derive(Clone, Debug, Serialize, PartialEq)]
208pub enum ReplicaLocation {
209 Unmanaged(UnmanagedReplicaLocation),
211 Managed(ManagedReplicaLocation),
213}
214
215impl ReplicaLocation {
216 pub fn num_processes(&self) -> usize {
218 match self {
219 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
220 computectl_addrs, ..
221 }) => computectl_addrs.len(),
222 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
223 allocation.scale.cast_into()
224 }
225 }
226 }
227
228 pub fn billed_as(&self) -> Option<&str> {
229 match self {
230 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
231 billed_as.as_deref()
232 }
233 _ => None,
234 }
235 }
236
237 pub fn internal(&self) -> bool {
238 match self {
239 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
240 ReplicaLocation::Unmanaged(_) => false,
241 }
242 }
243
244 pub fn workers(&self) -> Option<usize> {
248 match self {
249 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
250 Some(allocation.workers.get() * self.num_processes())
251 }
252 ReplicaLocation::Unmanaged(_) => None,
253 }
254 }
255
256 pub fn pending(&self) -> bool {
261 match self {
262 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
263 _ => false,
264 }
265 }
266}
267
268pub enum ClusterRole {
271 SystemCritical,
274 System,
278 User,
281}
282
283#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
285pub struct UnmanagedReplicaLocation {
286 pub storagectl_addrs: Vec<String>,
289 pub computectl_addrs: Vec<String>,
292}
293
294#[derive(Clone, Debug, PartialEq, Eq)]
296pub enum ManagedReplicaAvailabilityZones {
297 FromCluster(Option<Vec<String>>),
301 FromReplica(Option<String>),
304}
305
306#[derive(Clone, Debug, Serialize, PartialEq)]
308pub struct ManagedReplicaLocation {
309 pub allocation: ReplicaAllocation,
311 pub size: String,
313 pub internal: bool,
315 pub billed_as: Option<String>,
317 #[serde(skip)]
331 pub availability_zones: ManagedReplicaAvailabilityZones,
332 pub pending: bool,
334}
335
336impl ManagedReplicaLocation {
337 pub fn size_for_billing(&self) -> &str {
339 self.billed_as.as_deref().unwrap_or(&self.size)
340 }
341}
342
343pub type ReplicaLogging = ComputeReplicaLogging;
345
346pub type ProcessId = u64;
348
349#[derive(Debug, Clone, Serialize)]
351pub struct ClusterEvent {
352 pub cluster_id: ClusterId,
353 pub replica_id: ReplicaId,
354 pub process_id: ProcessId,
355 pub status: ClusterStatus,
356 pub time: DateTime<Utc>,
357}
358
359impl<T> Controller<T>
360where
361 T: ComputeControllerTimestamp,
362{
363 pub fn create_cluster(
369 &mut self,
370 id: ClusterId,
371 config: ClusterConfig,
372 ) -> Result<(), anyhow::Error> {
373 self.storage
374 .create_instance(id, config.workload_class.clone());
375 self.compute
376 .create_instance(id, config.arranged_logs, config.workload_class)?;
377 Ok(())
378 }
379
380 pub fn update_cluster_workload_class(
382 &mut self,
383 id: ClusterId,
384 workload_class: Option<String>,
385 ) -> Result<(), anyhow::Error> {
386 self.storage
387 .update_instance_workload_class(id, workload_class.clone());
388 self.compute
389 .update_instance_workload_class(id, workload_class)?;
390 Ok(())
391 }
392
393 pub fn drop_cluster(&mut self, id: ClusterId) {
399 self.storage.drop_instance(id);
400 self.compute.drop_instance(id);
401 }
402
403 pub fn create_replica(
406 &mut self,
407 cluster_id: ClusterId,
408 replica_id: ReplicaId,
409 cluster_name: String,
410 replica_name: String,
411 role: ClusterRole,
412 config: ReplicaConfig,
413 enable_worker_core_affinity: bool,
414 ) -> Result<(), anyhow::Error> {
415 let storage_location: ClusterReplicaLocation;
416 let compute_location: ClusterReplicaLocation;
417 let metrics_task: Option<AbortOnDropHandle<()>>;
418
419 match config.location {
420 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
421 storagectl_addrs,
422 computectl_addrs,
423 }) => {
424 compute_location = ClusterReplicaLocation {
425 ctl_addrs: computectl_addrs,
426 };
427 storage_location = ClusterReplicaLocation {
428 ctl_addrs: storagectl_addrs,
429 };
430 metrics_task = None;
431 }
432 ReplicaLocation::Managed(m) => {
433 let (service, metrics_task_join_handle) = self.provision_replica(
434 cluster_id,
435 replica_id,
436 cluster_name,
437 replica_name,
438 role,
439 m,
440 enable_worker_core_affinity,
441 )?;
442 storage_location = ClusterReplicaLocation {
443 ctl_addrs: service.addresses("storagectl"),
444 };
445 compute_location = ClusterReplicaLocation {
446 ctl_addrs: service.addresses("computectl"),
447 };
448 metrics_task = Some(metrics_task_join_handle);
449
450 let http_addresses = service.addresses("internal-http");
452 self.replica_http_locator
453 .register_replica(cluster_id, replica_id, http_addresses);
454 }
455 }
456
457 self.storage
458 .connect_replica(cluster_id, replica_id, storage_location);
459 self.compute.add_replica_to_instance(
460 cluster_id,
461 replica_id,
462 compute_location,
463 config.compute,
464 )?;
465
466 if let Some(task) = metrics_task {
467 self.metrics_tasks.insert(replica_id, task);
468 }
469
470 Ok(())
471 }
472
473 pub fn drop_replica(
475 &mut self,
476 cluster_id: ClusterId,
477 replica_id: ReplicaId,
478 ) -> Result<(), anyhow::Error> {
479 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
484 self.metrics_tasks.remove(&replica_id);
485
486 self.replica_http_locator
488 .remove_replica(cluster_id, replica_id);
489
490 self.compute.drop_replica(cluster_id, replica_id)?;
491 self.storage.drop_replica(cluster_id, replica_id);
492 Ok(())
493 }
494
495 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
497 let deploy_generation = self.deploy_generation;
498 let dyncfg = Arc::clone(self.compute.dyncfg());
499 let orchestrator = Arc::clone(&self.orchestrator);
500 task::spawn(
501 || "controller_remove_past_generation_replicas",
502 async move {
503 info!("attempting to remove past generation replicas");
504 loop {
505 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
506 .await
507 {
508 Ok(()) => {
509 info!("successfully removed past generation replicas");
510 return;
511 }
512 Err(e) => {
513 let interval =
514 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
515 .get(&dyncfg);
516 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
517 time::sleep(interval).await;
518 }
519 }
520 }
521 },
522 );
523 }
524
525 #[instrument]
527 pub async fn remove_orphaned_replicas(
528 &mut self,
529 next_user_replica_id: u64,
530 next_system_replica_id: u64,
531 ) -> Result<(), anyhow::Error> {
532 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
533
534 let actual: BTreeSet<_> = self
535 .orchestrator
536 .list_services()
537 .await?
538 .iter()
539 .map(|s| ReplicaServiceName::from_str(s))
540 .collect::<Result<_, _>>()?;
541
542 for ReplicaServiceName {
543 cluster_id,
544 replica_id,
545 generation,
546 } in actual
547 {
548 if generation != self.deploy_generation {
552 continue;
553 }
554
555 let smaller_next = match replica_id {
556 ReplicaId::User(id) if id >= next_user_replica_id => {
557 Some(ReplicaId::User(next_user_replica_id))
558 }
559 ReplicaId::System(id) if id >= next_system_replica_id => {
560 Some(ReplicaId::System(next_system_replica_id))
561 }
562 _ => None,
563 };
564 if let Some(next) = smaller_next {
565 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
570 }
571 if !desired.contains(&replica_id) {
572 self.deprovision_replica(cluster_id, replica_id, generation)?;
573 }
574 }
575
576 Ok(())
577 }
578
579 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
580 let deploy_generation = self.deploy_generation;
581
582 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
583 let ReplicaServiceName {
584 cluster_id,
585 replica_id,
586 generation: replica_generation,
587 ..
588 } = event.service_id.parse()?;
589
590 let event = ClusterEvent {
591 cluster_id,
592 replica_id,
593 process_id: event.process_id,
594 status: event.status,
595 time: event.time,
596 };
597
598 Ok((event, replica_generation))
599 }
600
601 let stream = self
602 .orchestrator
603 .watch_services()
604 .map(|event| event.and_then(translate_event))
605 .filter_map(move |event| async move {
606 match event {
607 Ok((event, replica_generation)) => {
608 if replica_generation == deploy_generation {
609 Some(event)
610 } else {
611 None
612 }
613 }
614 Err(error) => {
615 error!("service watch error: {error}");
616 None
617 }
618 }
619 });
620
621 Box::pin(stream)
622 }
623
624 fn provision_replica(
626 &self,
627 cluster_id: ClusterId,
628 replica_id: ReplicaId,
629 cluster_name: String,
630 replica_name: String,
631 role: ClusterRole,
632 location: ManagedReplicaLocation,
633 enable_worker_core_affinity: bool,
634 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
635 let service_name = ReplicaServiceName {
636 cluster_id,
637 replica_id,
638 generation: self.deploy_generation,
639 }
640 .to_string();
641 let role_label = match role {
642 ClusterRole::SystemCritical => "system-critical",
643 ClusterRole::System => "system",
644 ClusterRole::User => "user",
645 };
646 let environment_id = self.connection_context().environment_id.clone();
647 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
648 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
649 let persist_pubsub_url = self.persist_pubsub_url.clone();
650 let secrets_args = self.secrets_args.to_flags();
651
652 let storage_proto_timely_config = TimelyConfig {
654 arrangement_exert_proportionality: 1337,
655 ..Default::default()
656 };
657 let compute_proto_timely_config = TimelyConfig {
658 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
659 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
660 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
661 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
662 ..Default::default()
663 };
664
665 let mut disk_limit = location.allocation.disk_limit;
666 let memory_limit = location.allocation.memory_limit;
667 let mut memory_request = None;
668
669 if location.allocation.swap_enabled {
670 disk_limit = Some(DiskLimit::ZERO);
674
675 memory_request = memory_limit.map(|MemoryLimit(limit)| {
679 let request = ByteSize::b(limit.as_u64() - 1);
680 MemoryLimit(request)
681 });
682 }
683
684 let service = self.orchestrator.ensure_service(
685 &service_name,
686 ServiceConfig {
687 image: self.clusterd_image.clone(),
688 init_container_image: self.init_container_image.clone(),
689 args: Box::new(move |assigned| {
690 let storage_timely_config = TimelyConfig {
691 workers: location.allocation.workers.get(),
692 addresses: assigned.peer_addresses("storage"),
693 ..storage_proto_timely_config
694 };
695 let compute_timely_config = TimelyConfig {
696 workers: location.allocation.workers.get(),
697 addresses: assigned.peer_addresses("compute"),
698 ..compute_proto_timely_config
699 };
700
701 let mut args = vec![
702 format!(
703 "--storage-controller-listen-addr={}",
704 assigned.listen_addrs["storagectl"]
705 ),
706 format!(
707 "--compute-controller-listen-addr={}",
708 assigned.listen_addrs["computectl"]
709 ),
710 format!(
711 "--internal-http-listen-addr={}",
712 assigned.listen_addrs["internal-http"]
713 ),
714 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
715 format!("--opentelemetry-resource=replica_id={}", replica_id),
716 format!("--persist-pubsub-url={}", persist_pubsub_url),
717 format!("--environment-id={}", environment_id),
718 format!(
719 "--storage-timely-config={}",
720 storage_timely_config.to_string(),
721 ),
722 format!(
723 "--compute-timely-config={}",
724 compute_timely_config.to_string(),
725 ),
726 ];
727 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
728 args.push(format!(
729 "--aws-external-id-prefix={}",
730 aws_external_id_prefix
731 ));
732 }
733 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
734 args.push(format!(
735 "--aws-connection-role-arn={}",
736 aws_connection_role_arn
737 ));
738 }
739 if let Some(memory_limit) = location.allocation.memory_limit {
740 args.push(format!(
741 "--announce-memory-limit={}",
742 memory_limit.0.as_u64()
743 ));
744 }
745 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
746 args.push("--worker-core-affinity".into());
747 }
748 if location.allocation.is_cc {
749 args.push("--is-cc".into());
750 }
751
752 if location.allocation.swap_enabled
755 && let Some(memory_limit) = location.allocation.memory_limit
756 && let Some(disk_limit) = location.allocation.disk_limit
757 && disk_limit != DiskLimit::ZERO
761 {
762 let heap_limit = memory_limit.0 + disk_limit.0;
763 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
764 }
765
766 args.extend(secrets_args.clone());
767 args
768 }),
769 ports: vec![
770 ServicePort {
771 name: "storagectl".into(),
772 port_hint: 2100,
773 },
774 ServicePort {
778 name: "storage".into(),
779 port_hint: 2103,
780 },
781 ServicePort {
782 name: "computectl".into(),
783 port_hint: 2101,
784 },
785 ServicePort {
786 name: "compute".into(),
787 port_hint: 2102,
788 },
789 ServicePort {
790 name: "internal-http".into(),
791 port_hint: 6878,
792 },
793 ],
794 cpu_limit: location.allocation.cpu_limit,
795 cpu_request: location.allocation.cpu_request,
796 memory_limit,
797 memory_request,
798 scale: location.allocation.scale,
799 labels: BTreeMap::from([
800 ("replica-id".into(), replica_id.to_string()),
801 ("cluster-id".into(), cluster_id.to_string()),
802 ("type".into(), "cluster".into()),
803 ("replica-role".into(), role_label.into()),
804 ("workers".into(), location.allocation.workers.to_string()),
805 (
806 "size".into(),
807 location
808 .size
809 .to_string()
810 .replace("=", "-")
811 .replace(",", "_"),
812 ),
813 ]),
814 annotations: BTreeMap::from([
815 ("replica-name".into(), replica_name),
816 ("cluster-name".into(), cluster_name),
817 ]),
818 availability_zones: match location.availability_zones {
819 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
820 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
821 },
822 other_replicas_selector: vec![
826 LabelSelector {
827 label_name: "cluster-id".to_string(),
828 logic: LabelSelectionLogic::Eq {
829 value: cluster_id.to_string(),
830 },
831 },
832 LabelSelector {
834 label_name: "replica-id".into(),
835 logic: LabelSelectionLogic::NotEq {
836 value: replica_id.to_string(),
837 },
838 },
839 ],
840 replicas_selector: vec![LabelSelector {
841 label_name: "cluster-id".to_string(),
842 logic: LabelSelectionLogic::Eq {
844 value: cluster_id.to_string(),
845 },
846 }],
847 disk_limit,
848 node_selector: location.allocation.selectors,
849 },
850 )?;
851
852 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
853 let tx = self.metrics_tx.clone();
854 let orchestrator = Arc::clone(&self.orchestrator);
855 let service_name = service_name.clone();
856 async move {
857 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
858
859 let mut interval = tokio::time::interval(METRICS_INTERVAL);
867 loop {
868 interval.tick().await;
869 match orchestrator.fetch_service_metrics(&service_name).await {
870 Ok(metrics) => {
871 let _ = tx.send((replica_id, metrics));
872 }
873 Err(e) => {
874 warn!("failed to get metrics for replica {replica_id}: {e}");
875 }
876 }
877 }
878 }
879 });
880
881 Ok((service, metrics_task.abort_on_drop()))
882 }
883
884 fn deprovision_replica(
886 &self,
887 cluster_id: ClusterId,
888 replica_id: ReplicaId,
889 generation: u64,
890 ) -> Result<(), anyhow::Error> {
891 let service_name = ReplicaServiceName {
892 cluster_id,
893 replica_id,
894 generation,
895 }
896 .to_string();
897 self.orchestrator.drop_service(&service_name)
898 }
899}
900
901async fn try_remove_past_generation_replicas(
903 orchestrator: &dyn NamespacedOrchestrator,
904 deploy_generation: u64,
905) -> Result<(), anyhow::Error> {
906 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
907
908 for service in services {
909 let name: ReplicaServiceName = service.parse()?;
910 if name.generation < deploy_generation {
911 info!(
912 cluster_id = %name.cluster_id,
913 replica_id = %name.replica_id,
914 "removing past generation replica",
915 );
916 orchestrator.drop_service(&service)?;
917 }
918 }
919
920 Ok(())
921}
922
923#[derive(PartialEq, Eq, PartialOrd, Ord)]
925pub struct ReplicaServiceName {
926 pub cluster_id: ClusterId,
927 pub replica_id: ReplicaId,
928 pub generation: u64,
929}
930
931impl fmt::Display for ReplicaServiceName {
932 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
933 let ReplicaServiceName {
934 cluster_id,
935 replica_id,
936 generation,
937 } = self;
938 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
939 }
940}
941
942impl FromStr for ReplicaServiceName {
943 type Err = anyhow::Error;
944
945 fn from_str(s: &str) -> Result<Self, Self::Err> {
946 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
947 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
948 });
949
950 let caps = SERVICE_NAME_RE
951 .captures(s)
952 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
953
954 Ok(ReplicaServiceName {
955 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
956 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
957 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
961 })
962 }
963}