1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36 ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50pub struct ClusterConfig {
52 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57 pub workload_class: Option<String>,
60}
61
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68 pub location: ReplicaLocation,
70 pub compute: ComputeReplicaConfig,
72}
73
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77 pub memory_limit: Option<MemoryLimit>,
79 pub cpu_limit: Option<CpuLimit>,
81 pub disk_limit: Option<DiskLimit>,
83 pub scale: NonZero<u16>,
85 pub workers: NonZero<usize>,
87 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
89 pub credits_per_hour: Numeric,
90 #[serde(default)]
92 pub cpu_exclusive: bool,
93 #[serde(default = "default_true")]
96 pub is_cc: bool,
97 #[serde(default)]
99 pub swap_enabled: bool,
100 #[serde(default)]
102 pub disabled: bool,
103 #[serde(default)]
105 pub selectors: BTreeMap<String, String>,
106}
107
108fn default_true() -> bool {
109 true
110}
111
112#[mz_ore::test]
113#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
116 use bytesize::ByteSize;
117 use mz_ore::{assert_err, assert_ok};
118
119 let data = r#"
120 {
121 "cpu_limit": 1.0,
122 "memory_limit": "10GiB",
123 "disk_limit": "100MiB",
124 "scale": 16,
125 "workers": 1,
126 "credits_per_hour": "16",
127 "swap_enabled": true,
128 "selectors": {
129 "key1": "value1",
130 "key2": "value2"
131 }
132 }"#;
133
134 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
135 .expect("deserialization from JSON succeeds for ReplicaAllocation");
136
137 assert_eq!(
138 replica_allocation,
139 ReplicaAllocation {
140 credits_per_hour: 16.into(),
141 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
142 disabled: false,
143 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
144 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
145 cpu_exclusive: false,
146 is_cc: true,
147 swap_enabled: true,
148 scale: NonZero::new(16).unwrap(),
149 workers: NonZero::new(1).unwrap(),
150 selectors: BTreeMap::from([
151 ("key1".to_string(), "value1".to_string()),
152 ("key2".to_string(), "value2".to_string())
153 ]),
154 }
155 );
156
157 let data = r#"
158 {
159 "cpu_limit": 0,
160 "memory_limit": "0GiB",
161 "disk_limit": "0MiB",
162 "scale": 1,
163 "workers": 1,
164 "credits_per_hour": "0",
165 "cpu_exclusive": true,
166 "disabled": true
167 }"#;
168
169 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
170 .expect("deserialization from JSON succeeds for ReplicaAllocation");
171
172 assert_eq!(
173 replica_allocation,
174 ReplicaAllocation {
175 credits_per_hour: 0.into(),
176 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
177 disabled: true,
178 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
179 cpu_limit: Some(CpuLimit::from_millicpus(0)),
180 cpu_exclusive: true,
181 is_cc: true,
182 swap_enabled: false,
183 scale: NonZero::new(1).unwrap(),
184 workers: NonZero::new(1).unwrap(),
185 selectors: Default::default(),
186 }
187 );
188
189 let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
191 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
192 let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
193 assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
194 let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
195 assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
196}
197
198#[derive(Clone, Debug, Serialize, PartialEq)]
200pub enum ReplicaLocation {
201 Unmanaged(UnmanagedReplicaLocation),
203 Managed(ManagedReplicaLocation),
205}
206
207impl ReplicaLocation {
208 pub fn num_processes(&self) -> usize {
210 match self {
211 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
212 computectl_addrs, ..
213 }) => computectl_addrs.len(),
214 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
215 allocation.scale.cast_into()
216 }
217 }
218 }
219
220 pub fn billed_as(&self) -> Option<&str> {
221 match self {
222 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
223 billed_as.as_deref()
224 }
225 _ => None,
226 }
227 }
228
229 pub fn internal(&self) -> bool {
230 match self {
231 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
232 ReplicaLocation::Unmanaged(_) => false,
233 }
234 }
235
236 pub fn workers(&self) -> Option<usize> {
240 match self {
241 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
242 Some(allocation.workers.get() * self.num_processes())
243 }
244 ReplicaLocation::Unmanaged(_) => None,
245 }
246 }
247
248 pub fn pending(&self) -> bool {
253 match self {
254 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
255 _ => false,
256 }
257 }
258}
259
260pub enum ClusterRole {
263 SystemCritical,
266 System,
270 User,
273}
274
275#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
277pub struct UnmanagedReplicaLocation {
278 pub storagectl_addrs: Vec<String>,
281 pub computectl_addrs: Vec<String>,
284}
285
286#[derive(Clone, Debug, PartialEq, Eq)]
288pub enum ManagedReplicaAvailabilityZones {
289 FromCluster(Option<Vec<String>>),
293 FromReplica(Option<String>),
296}
297
298#[derive(Clone, Debug, Serialize, PartialEq)]
300pub struct ManagedReplicaLocation {
301 pub allocation: ReplicaAllocation,
303 pub size: String,
305 pub internal: bool,
307 pub billed_as: Option<String>,
309 #[serde(skip)]
323 pub availability_zones: ManagedReplicaAvailabilityZones,
324 pub pending: bool,
326}
327
328impl ManagedReplicaLocation {
329 pub fn size_for_billing(&self) -> &str {
331 self.billed_as.as_deref().unwrap_or(&self.size)
332 }
333}
334
335pub type ReplicaLogging = ComputeReplicaLogging;
337
338pub type ProcessId = u64;
340
341#[derive(Debug, Clone, Serialize)]
343pub struct ClusterEvent {
344 pub cluster_id: ClusterId,
345 pub replica_id: ReplicaId,
346 pub process_id: ProcessId,
347 pub status: ClusterStatus,
348 pub time: DateTime<Utc>,
349}
350
351impl<T> Controller<T>
352where
353 T: ComputeControllerTimestamp,
354{
355 pub fn create_cluster(
361 &mut self,
362 id: ClusterId,
363 config: ClusterConfig,
364 ) -> Result<(), anyhow::Error> {
365 self.storage
366 .create_instance(id, config.workload_class.clone());
367 self.compute
368 .create_instance(id, config.arranged_logs, config.workload_class)?;
369 Ok(())
370 }
371
372 pub fn update_cluster_workload_class(
374 &mut self,
375 id: ClusterId,
376 workload_class: Option<String>,
377 ) -> Result<(), anyhow::Error> {
378 self.storage
379 .update_instance_workload_class(id, workload_class.clone());
380 self.compute
381 .update_instance_workload_class(id, workload_class)?;
382 Ok(())
383 }
384
385 pub fn drop_cluster(&mut self, id: ClusterId) {
391 self.storage.drop_instance(id);
392 self.compute.drop_instance(id);
393 }
394
395 pub fn create_replica(
398 &mut self,
399 cluster_id: ClusterId,
400 replica_id: ReplicaId,
401 cluster_name: String,
402 replica_name: String,
403 role: ClusterRole,
404 config: ReplicaConfig,
405 enable_worker_core_affinity: bool,
406 ) -> Result<(), anyhow::Error> {
407 let storage_location: ClusterReplicaLocation;
408 let compute_location: ClusterReplicaLocation;
409 let metrics_task: Option<AbortOnDropHandle<()>>;
410
411 match config.location {
412 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
413 storagectl_addrs,
414 computectl_addrs,
415 }) => {
416 compute_location = ClusterReplicaLocation {
417 ctl_addrs: computectl_addrs,
418 };
419 storage_location = ClusterReplicaLocation {
420 ctl_addrs: storagectl_addrs,
421 };
422 metrics_task = None;
423 }
424 ReplicaLocation::Managed(m) => {
425 let (service, metrics_task_join_handle) = self.provision_replica(
426 cluster_id,
427 replica_id,
428 cluster_name,
429 replica_name,
430 role,
431 m,
432 enable_worker_core_affinity,
433 )?;
434 storage_location = ClusterReplicaLocation {
435 ctl_addrs: service.addresses("storagectl"),
436 };
437 compute_location = ClusterReplicaLocation {
438 ctl_addrs: service.addresses("computectl"),
439 };
440 metrics_task = Some(metrics_task_join_handle);
441 }
442 }
443
444 self.storage
445 .connect_replica(cluster_id, replica_id, storage_location);
446 self.compute.add_replica_to_instance(
447 cluster_id,
448 replica_id,
449 compute_location,
450 config.compute,
451 )?;
452
453 if let Some(task) = metrics_task {
454 self.metrics_tasks.insert(replica_id, task);
455 }
456
457 Ok(())
458 }
459
460 pub fn drop_replica(
462 &mut self,
463 cluster_id: ClusterId,
464 replica_id: ReplicaId,
465 ) -> Result<(), anyhow::Error> {
466 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
471 self.metrics_tasks.remove(&replica_id);
472
473 self.compute.drop_replica(cluster_id, replica_id)?;
474 self.storage.drop_replica(cluster_id, replica_id);
475 Ok(())
476 }
477
478 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
480 let deploy_generation = self.deploy_generation;
481 let dyncfg = Arc::clone(self.compute.dyncfg());
482 let orchestrator = Arc::clone(&self.orchestrator);
483 task::spawn(
484 || "controller_remove_past_generation_replicas",
485 async move {
486 info!("attempting to remove past generation replicas");
487 loop {
488 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
489 .await
490 {
491 Ok(()) => {
492 info!("successfully removed past generation replicas");
493 return;
494 }
495 Err(e) => {
496 let interval =
497 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
498 .get(&dyncfg);
499 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
500 time::sleep(interval).await;
501 }
502 }
503 }
504 },
505 );
506 }
507
508 #[instrument]
510 pub async fn remove_orphaned_replicas(
511 &mut self,
512 next_user_replica_id: u64,
513 next_system_replica_id: u64,
514 ) -> Result<(), anyhow::Error> {
515 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
516
517 let actual: BTreeSet<_> = self
518 .orchestrator
519 .list_services()
520 .await?
521 .iter()
522 .map(|s| ReplicaServiceName::from_str(s))
523 .collect::<Result<_, _>>()?;
524
525 for ReplicaServiceName {
526 cluster_id,
527 replica_id,
528 generation,
529 } in actual
530 {
531 if generation != self.deploy_generation {
535 continue;
536 }
537
538 let smaller_next = match replica_id {
539 ReplicaId::User(id) if id >= next_user_replica_id => {
540 Some(ReplicaId::User(next_user_replica_id))
541 }
542 ReplicaId::System(id) if id >= next_system_replica_id => {
543 Some(ReplicaId::System(next_system_replica_id))
544 }
545 _ => None,
546 };
547 if let Some(next) = smaller_next {
548 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
553 }
554 if !desired.contains(&replica_id) {
555 self.deprovision_replica(cluster_id, replica_id, generation)?;
556 }
557 }
558
559 Ok(())
560 }
561
562 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
563 let deploy_generation = self.deploy_generation;
564
565 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
566 let ReplicaServiceName {
567 cluster_id,
568 replica_id,
569 generation: replica_generation,
570 ..
571 } = event.service_id.parse()?;
572
573 let event = ClusterEvent {
574 cluster_id,
575 replica_id,
576 process_id: event.process_id,
577 status: event.status,
578 time: event.time,
579 };
580
581 Ok((event, replica_generation))
582 }
583
584 let stream = self
585 .orchestrator
586 .watch_services()
587 .map(|event| event.and_then(translate_event))
588 .filter_map(move |event| async move {
589 match event {
590 Ok((event, replica_generation)) => {
591 if replica_generation == deploy_generation {
592 Some(event)
593 } else {
594 None
595 }
596 }
597 Err(error) => {
598 error!("service watch error: {error}");
599 None
600 }
601 }
602 });
603
604 Box::pin(stream)
605 }
606
607 fn provision_replica(
609 &self,
610 cluster_id: ClusterId,
611 replica_id: ReplicaId,
612 cluster_name: String,
613 replica_name: String,
614 role: ClusterRole,
615 location: ManagedReplicaLocation,
616 enable_worker_core_affinity: bool,
617 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
618 let service_name = ReplicaServiceName {
619 cluster_id,
620 replica_id,
621 generation: self.deploy_generation,
622 }
623 .to_string();
624 let role_label = match role {
625 ClusterRole::SystemCritical => "system-critical",
626 ClusterRole::System => "system",
627 ClusterRole::User => "user",
628 };
629 let environment_id = self.connection_context().environment_id.clone();
630 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
631 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
632 let persist_pubsub_url = self.persist_pubsub_url.clone();
633 let secrets_args = self.secrets_args.to_flags();
634
635 let storage_proto_timely_config = TimelyConfig {
637 arrangement_exert_proportionality: 1337,
638 ..Default::default()
639 };
640 let compute_proto_timely_config = TimelyConfig {
641 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
642 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
643 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
644 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
645 ..Default::default()
646 };
647
648 let mut disk_limit = location.allocation.disk_limit;
649 let memory_limit = location.allocation.memory_limit;
650 let mut memory_request = None;
651
652 if location.allocation.swap_enabled {
653 disk_limit = Some(DiskLimit::ZERO);
657
658 memory_request = memory_limit.map(|MemoryLimit(limit)| {
662 let request = ByteSize::b(limit.as_u64() - 1);
663 MemoryLimit(request)
664 });
665 }
666
667 let service = self.orchestrator.ensure_service(
668 &service_name,
669 ServiceConfig {
670 image: self.clusterd_image.clone(),
671 init_container_image: self.init_container_image.clone(),
672 args: Box::new(move |assigned| {
673 let storage_timely_config = TimelyConfig {
674 workers: location.allocation.workers.get(),
675 addresses: assigned.peer_addresses("storage"),
676 ..storage_proto_timely_config
677 };
678 let compute_timely_config = TimelyConfig {
679 workers: location.allocation.workers.get(),
680 addresses: assigned.peer_addresses("compute"),
681 ..compute_proto_timely_config
682 };
683
684 let mut args = vec![
685 format!(
686 "--storage-controller-listen-addr={}",
687 assigned.listen_addrs["storagectl"]
688 ),
689 format!(
690 "--compute-controller-listen-addr={}",
691 assigned.listen_addrs["computectl"]
692 ),
693 format!(
694 "--internal-http-listen-addr={}",
695 assigned.listen_addrs["internal-http"]
696 ),
697 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
698 format!("--opentelemetry-resource=replica_id={}", replica_id),
699 format!("--persist-pubsub-url={}", persist_pubsub_url),
700 format!("--environment-id={}", environment_id),
701 format!(
702 "--storage-timely-config={}",
703 storage_timely_config.to_string(),
704 ),
705 format!(
706 "--compute-timely-config={}",
707 compute_timely_config.to_string(),
708 ),
709 ];
710 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
711 args.push(format!(
712 "--aws-external-id-prefix={}",
713 aws_external_id_prefix
714 ));
715 }
716 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
717 args.push(format!(
718 "--aws-connection-role-arn={}",
719 aws_connection_role_arn
720 ));
721 }
722 if let Some(memory_limit) = location.allocation.memory_limit {
723 args.push(format!(
724 "--announce-memory-limit={}",
725 memory_limit.0.as_u64()
726 ));
727 }
728 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
729 args.push("--worker-core-affinity".into());
730 }
731 if location.allocation.is_cc {
732 args.push("--is-cc".into());
733 }
734
735 if location.allocation.swap_enabled
738 && let Some(memory_limit) = location.allocation.memory_limit
739 && let Some(disk_limit) = location.allocation.disk_limit
740 && disk_limit != DiskLimit::ZERO
744 {
745 let heap_limit = memory_limit.0 + disk_limit.0;
746 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
747 }
748
749 args.extend(secrets_args.clone());
750 args
751 }),
752 ports: vec![
753 ServicePort {
754 name: "storagectl".into(),
755 port_hint: 2100,
756 },
757 ServicePort {
761 name: "storage".into(),
762 port_hint: 2103,
763 },
764 ServicePort {
765 name: "computectl".into(),
766 port_hint: 2101,
767 },
768 ServicePort {
769 name: "compute".into(),
770 port_hint: 2102,
771 },
772 ServicePort {
773 name: "internal-http".into(),
774 port_hint: 6878,
775 },
776 ],
777 cpu_limit: location.allocation.cpu_limit,
778 memory_limit,
779 memory_request,
780 scale: location.allocation.scale,
781 labels: BTreeMap::from([
782 ("replica-id".into(), replica_id.to_string()),
783 ("cluster-id".into(), cluster_id.to_string()),
784 ("type".into(), "cluster".into()),
785 ("replica-role".into(), role_label.into()),
786 ("workers".into(), location.allocation.workers.to_string()),
787 (
788 "size".into(),
789 location
790 .size
791 .to_string()
792 .replace("=", "-")
793 .replace(",", "_"),
794 ),
795 ]),
796 annotations: BTreeMap::from([
797 ("replica-name".into(), replica_name),
798 ("cluster-name".into(), cluster_name),
799 ]),
800 availability_zones: match location.availability_zones {
801 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
802 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
803 },
804 other_replicas_selector: vec![
808 LabelSelector {
809 label_name: "cluster-id".to_string(),
810 logic: LabelSelectionLogic::Eq {
811 value: cluster_id.to_string(),
812 },
813 },
814 LabelSelector {
816 label_name: "replica-id".into(),
817 logic: LabelSelectionLogic::NotEq {
818 value: replica_id.to_string(),
819 },
820 },
821 ],
822 replicas_selector: vec![LabelSelector {
823 label_name: "cluster-id".to_string(),
824 logic: LabelSelectionLogic::Eq {
826 value: cluster_id.to_string(),
827 },
828 }],
829 disk_limit,
830 node_selector: location.allocation.selectors,
831 },
832 )?;
833
834 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
835 let tx = self.metrics_tx.clone();
836 let orchestrator = Arc::clone(&self.orchestrator);
837 let service_name = service_name.clone();
838 async move {
839 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
840
841 let mut interval = tokio::time::interval(METRICS_INTERVAL);
849 loop {
850 interval.tick().await;
851 match orchestrator.fetch_service_metrics(&service_name).await {
852 Ok(metrics) => {
853 let _ = tx.send((replica_id, metrics));
854 }
855 Err(e) => {
856 warn!("failed to get metrics for replica {replica_id}: {e}");
857 }
858 }
859 }
860 }
861 });
862
863 Ok((service, metrics_task.abort_on_drop()))
864 }
865
866 fn deprovision_replica(
868 &self,
869 cluster_id: ClusterId,
870 replica_id: ReplicaId,
871 generation: u64,
872 ) -> Result<(), anyhow::Error> {
873 let service_name = ReplicaServiceName {
874 cluster_id,
875 replica_id,
876 generation,
877 }
878 .to_string();
879 self.orchestrator.drop_service(&service_name)
880 }
881}
882
883async fn try_remove_past_generation_replicas(
885 orchestrator: &dyn NamespacedOrchestrator,
886 deploy_generation: u64,
887) -> Result<(), anyhow::Error> {
888 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
889
890 for service in services {
891 let name: ReplicaServiceName = service.parse()?;
892 if name.generation < deploy_generation {
893 info!(
894 cluster_id = %name.cluster_id,
895 replica_id = %name.replica_id,
896 "removing past generation replica",
897 );
898 orchestrator.drop_service(&service)?;
899 }
900 }
901
902 Ok(())
903}
904
905#[derive(PartialEq, Eq, PartialOrd, Ord)]
907pub struct ReplicaServiceName {
908 pub cluster_id: ClusterId,
909 pub replica_id: ReplicaId,
910 pub generation: u64,
911}
912
913impl fmt::Display for ReplicaServiceName {
914 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
915 let ReplicaServiceName {
916 cluster_id,
917 replica_id,
918 generation,
919 } = self;
920 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
921 }
922}
923
924impl FromStr for ReplicaServiceName {
925 type Err = anyhow::Error;
926
927 fn from_str(s: &str) -> Result<Self, Self::Err> {
928 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
929 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
930 });
931
932 let caps = SERVICE_NAME_RE
933 .captures(s)
934 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
935
936 Ok(ReplicaServiceName {
937 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
938 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
939 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
943 })
944 }
945}