1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytesize::ByteSize;
21use chrono::{DateTime, Utc};
22use futures::stream::{BoxStream, StreamExt};
23use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
24use mz_compute_client::controller::ComputeControllerTimestamp;
25use mz_compute_client::logging::LogVariant;
26use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30 ENABLE_CTP_CLUSTER_PROTOCOLS, ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC,
31 TIMELY_ZERO_COPY_LIMIT,
32};
33use mz_controller_types::{ClusterId, ReplicaId};
34use mz_orchestrator::NamespacedOrchestrator;
35use mz_orchestrator::{
36 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
37 ServiceEvent, ServicePort,
38};
39use mz_ore::halt;
40use mz_ore::instrument;
41use mz_ore::task::{self, AbortOnDropHandle};
42use mz_repr::GlobalId;
43use mz_repr::adt::numeric::Numeric;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use tokio::time;
47use tracing::{error, info, warn};
48
49use crate::Controller;
50
51pub struct ClusterConfig {
53 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
58 pub workload_class: Option<String>,
61}
62
63pub type ClusterStatus = mz_orchestrator::ServiceStatus;
65
66#[derive(Clone, Debug, Serialize, PartialEq)]
68pub struct ReplicaConfig {
69 pub location: ReplicaLocation,
71 pub compute: ComputeReplicaConfig,
73}
74
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ReplicaAllocation {
78 pub memory_limit: Option<MemoryLimit>,
80 pub cpu_limit: Option<CpuLimit>,
82 pub disk_limit: Option<DiskLimit>,
84 pub scale: u16,
86 pub workers: usize,
88 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90 pub credits_per_hour: Numeric,
91 #[serde(default)]
93 pub cpu_exclusive: bool,
94 #[serde(default = "default_true")]
97 pub is_cc: bool,
98 #[serde(default)]
100 pub swap_enabled: bool,
101 #[serde(default)]
103 pub disabled: bool,
104 #[serde(default)]
106 pub selectors: BTreeMap<String, String>,
107}
108
109fn default_true() -> bool {
110 true
111}
112
113#[mz_ore::test]
114#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
117 use bytesize::ByteSize;
118
119 let data = r#"
120 {
121 "cpu_limit": 1.0,
122 "memory_limit": "10GiB",
123 "disk_limit": "100MiB",
124 "scale": 16,
125 "workers": 1,
126 "credits_per_hour": "16",
127 "swap_enabled": true,
128 "selectors": {
129 "key1": "value1",
130 "key2": "value2"
131 }
132 }"#;
133
134 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
135 .expect("deserialization from JSON succeeds for ReplicaAllocation");
136
137 assert_eq!(
138 replica_allocation,
139 ReplicaAllocation {
140 credits_per_hour: 16.into(),
141 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
142 disabled: false,
143 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
144 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
145 cpu_exclusive: false,
146 is_cc: true,
147 swap_enabled: true,
148 scale: 16,
149 workers: 1,
150 selectors: BTreeMap::from([
151 ("key1".to_string(), "value1".to_string()),
152 ("key2".to_string(), "value2".to_string())
153 ]),
154 }
155 );
156
157 let data = r#"
158 {
159 "cpu_limit": 0,
160 "memory_limit": "0GiB",
161 "disk_limit": "0MiB",
162 "scale": 0,
163 "workers": 0,
164 "credits_per_hour": "0",
165 "cpu_exclusive": true,
166 "disabled": true
167 }"#;
168
169 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
170 .expect("deserialization from JSON succeeds for ReplicaAllocation");
171
172 assert_eq!(
173 replica_allocation,
174 ReplicaAllocation {
175 credits_per_hour: 0.into(),
176 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
177 disabled: true,
178 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
179 cpu_limit: Some(CpuLimit::from_millicpus(0)),
180 cpu_exclusive: true,
181 is_cc: true,
182 swap_enabled: false,
183 scale: 0,
184 workers: 0,
185 selectors: Default::default(),
186 }
187 );
188}
189
190#[derive(Clone, Debug, Serialize, PartialEq)]
192pub enum ReplicaLocation {
193 Unmanaged(UnmanagedReplicaLocation),
195 Managed(ManagedReplicaLocation),
197}
198
199impl ReplicaLocation {
200 pub fn num_processes(&self) -> usize {
202 match self {
203 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
204 computectl_addrs, ..
205 }) => computectl_addrs.len(),
206 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
207 allocation.scale.into()
208 }
209 }
210 }
211
212 pub fn billed_as(&self) -> Option<&str> {
213 match self {
214 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
215 billed_as.as_deref()
216 }
217 _ => None,
218 }
219 }
220
221 pub fn internal(&self) -> bool {
222 match self {
223 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
224 ReplicaLocation::Unmanaged(_) => false,
225 }
226 }
227
228 pub fn workers(&self) -> Option<usize> {
232 match self {
233 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
234 Some(allocation.workers * self.num_processes())
235 }
236 ReplicaLocation::Unmanaged(_) => None,
237 }
238 }
239
240 pub fn pending(&self) -> bool {
245 match self {
246 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
247 _ => false,
248 }
249 }
250}
251
252pub enum ClusterRole {
255 SystemCritical,
258 System,
262 User,
265}
266
267#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
269pub struct UnmanagedReplicaLocation {
270 pub storagectl_addrs: Vec<String>,
273 pub computectl_addrs: Vec<String>,
276}
277
278#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum ManagedReplicaAvailabilityZones {
281 FromCluster(Option<Vec<String>>),
285 FromReplica(Option<String>),
288}
289
290#[derive(Clone, Debug, Serialize, PartialEq)]
292pub struct ManagedReplicaLocation {
293 pub allocation: ReplicaAllocation,
295 pub size: String,
297 pub internal: bool,
299 pub billed_as: Option<String>,
301 #[serde(skip)]
315 pub availability_zones: ManagedReplicaAvailabilityZones,
316 pub pending: bool,
318}
319
320impl ManagedReplicaLocation {
321 pub fn size_for_billing(&self) -> &str {
323 self.billed_as.as_deref().unwrap_or(&self.size)
324 }
325}
326
327pub type ReplicaLogging = ComputeReplicaLogging;
329
330pub type ProcessId = u64;
332
333#[derive(Debug, Clone, Serialize)]
335pub struct ClusterEvent {
336 pub cluster_id: ClusterId,
337 pub replica_id: ReplicaId,
338 pub process_id: ProcessId,
339 pub status: ClusterStatus,
340 pub time: DateTime<Utc>,
341}
342
343impl<T> Controller<T>
344where
345 T: ComputeControllerTimestamp,
346 ComputeGrpcClient: ComputeClient<T>,
347{
348 pub fn create_cluster(
354 &mut self,
355 id: ClusterId,
356 config: ClusterConfig,
357 ) -> Result<(), anyhow::Error> {
358 self.storage
359 .create_instance(id, config.workload_class.clone());
360 self.compute
361 .create_instance(id, config.arranged_logs, config.workload_class)?;
362 Ok(())
363 }
364
365 pub fn update_cluster_workload_class(
367 &mut self,
368 id: ClusterId,
369 workload_class: Option<String>,
370 ) -> Result<(), anyhow::Error> {
371 self.storage
372 .update_instance_workload_class(id, workload_class.clone());
373 self.compute
374 .update_instance_workload_class(id, workload_class)?;
375 Ok(())
376 }
377
378 pub fn drop_cluster(&mut self, id: ClusterId) {
384 self.storage.drop_instance(id);
385 self.compute.drop_instance(id);
386 }
387
388 pub fn create_replica(
391 &mut self,
392 cluster_id: ClusterId,
393 replica_id: ReplicaId,
394 cluster_name: String,
395 replica_name: String,
396 role: ClusterRole,
397 config: ReplicaConfig,
398 enable_worker_core_affinity: bool,
399 ) -> Result<(), anyhow::Error> {
400 let storage_location: ClusterReplicaLocation;
401 let compute_location: ClusterReplicaLocation;
402 let metrics_task: Option<AbortOnDropHandle<()>>;
403
404 let enable_ctp = ENABLE_CTP_CLUSTER_PROTOCOLS.get(&self.dyncfg);
408
409 match config.location {
410 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
411 storagectl_addrs,
412 computectl_addrs,
413 }) => {
414 compute_location = ClusterReplicaLocation {
415 ctl_addrs: computectl_addrs,
416 };
417 storage_location = ClusterReplicaLocation {
418 ctl_addrs: storagectl_addrs,
419 };
420 metrics_task = None;
421 }
422 ReplicaLocation::Managed(m) => {
423 let (service, metrics_task_join_handle) = self.provision_replica(
424 cluster_id,
425 replica_id,
426 cluster_name,
427 replica_name,
428 role,
429 m,
430 enable_worker_core_affinity,
431 enable_ctp,
432 )?;
433 storage_location = ClusterReplicaLocation {
434 ctl_addrs: service.addresses("storagectl"),
435 };
436 compute_location = ClusterReplicaLocation {
437 ctl_addrs: service.addresses("computectl"),
438 };
439 metrics_task = Some(metrics_task_join_handle);
440 }
441 }
442
443 self.storage
444 .connect_replica(cluster_id, replica_id, storage_location, enable_ctp);
445 self.compute.add_replica_to_instance(
446 cluster_id,
447 replica_id,
448 compute_location,
449 config.compute,
450 enable_ctp,
451 )?;
452
453 if let Some(task) = metrics_task {
454 self.metrics_tasks.insert(replica_id, task);
455 }
456
457 Ok(())
458 }
459
460 pub fn drop_replica(
462 &mut self,
463 cluster_id: ClusterId,
464 replica_id: ReplicaId,
465 ) -> Result<(), anyhow::Error> {
466 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
471 self.metrics_tasks.remove(&replica_id);
472
473 self.compute.drop_replica(cluster_id, replica_id)?;
474 self.storage.drop_replica(cluster_id, replica_id);
475 Ok(())
476 }
477
478 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
480 let deploy_generation = self.deploy_generation;
481 let dyncfg = Arc::clone(self.compute.dyncfg());
482 let orchestrator = Arc::clone(&self.orchestrator);
483 task::spawn(
484 || "controller_remove_past_generation_replicas",
485 async move {
486 info!("attempting to remove past generation replicas");
487 loop {
488 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
489 .await
490 {
491 Ok(()) => {
492 info!("successfully removed past generation replicas");
493 return;
494 }
495 Err(e) => {
496 let interval =
497 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
498 .get(&dyncfg);
499 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
500 time::sleep(interval).await;
501 }
502 }
503 }
504 },
505 );
506 }
507
508 #[instrument]
510 pub async fn remove_orphaned_replicas(
511 &mut self,
512 next_user_replica_id: u64,
513 next_system_replica_id: u64,
514 ) -> Result<(), anyhow::Error> {
515 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
516
517 let actual: BTreeSet<_> = self
518 .orchestrator
519 .list_services()
520 .await?
521 .iter()
522 .map(|s| ReplicaServiceName::from_str(s))
523 .collect::<Result<_, _>>()?;
524
525 for ReplicaServiceName {
526 cluster_id,
527 replica_id,
528 generation,
529 } in actual
530 {
531 if generation != self.deploy_generation {
535 continue;
536 }
537
538 let smaller_next = match replica_id {
539 ReplicaId::User(id) if id >= next_user_replica_id => {
540 Some(ReplicaId::User(next_user_replica_id))
541 }
542 ReplicaId::System(id) if id >= next_system_replica_id => {
543 Some(ReplicaId::System(next_system_replica_id))
544 }
545 _ => None,
546 };
547 if let Some(next) = smaller_next {
548 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
553 }
554 if !desired.contains(&replica_id) {
555 self.deprovision_replica(cluster_id, replica_id, generation)?;
556 }
557 }
558
559 Ok(())
560 }
561
562 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
563 let deploy_generation = self.deploy_generation;
564
565 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
566 let ReplicaServiceName {
567 cluster_id,
568 replica_id,
569 generation: replica_generation,
570 ..
571 } = event.service_id.parse()?;
572
573 let event = ClusterEvent {
574 cluster_id,
575 replica_id,
576 process_id: event.process_id,
577 status: event.status,
578 time: event.time,
579 };
580
581 Ok((event, replica_generation))
582 }
583
584 let stream = self
585 .orchestrator
586 .watch_services()
587 .map(|event| event.and_then(translate_event))
588 .filter_map(move |event| async move {
589 match event {
590 Ok((event, replica_generation)) => {
591 if replica_generation == deploy_generation {
592 Some(event)
593 } else {
594 None
595 }
596 }
597 Err(error) => {
598 error!("service watch error: {error}");
599 None
600 }
601 }
602 });
603
604 Box::pin(stream)
605 }
606
607 fn provision_replica(
609 &self,
610 cluster_id: ClusterId,
611 replica_id: ReplicaId,
612 cluster_name: String,
613 replica_name: String,
614 role: ClusterRole,
615 location: ManagedReplicaLocation,
616 enable_worker_core_affinity: bool,
617 enable_ctp: bool,
618 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
619 let service_name = ReplicaServiceName {
620 cluster_id,
621 replica_id,
622 generation: self.deploy_generation,
623 }
624 .to_string();
625 let role_label = match role {
626 ClusterRole::SystemCritical => "system-critical",
627 ClusterRole::System => "system",
628 ClusterRole::User => "user",
629 };
630 let environment_id = self.connection_context().environment_id.clone();
631 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
632 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
633 let persist_pubsub_url = self.persist_pubsub_url.clone();
634 let secrets_args = self.secrets_args.to_flags();
635
636 let storage_proto_timely_config = TimelyConfig {
638 arrangement_exert_proportionality: 1337,
639 ..Default::default()
640 };
641 let compute_proto_timely_config = TimelyConfig {
642 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
643 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
644 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
645 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
646 ..Default::default()
647 };
648
649 let mut disk_limit = location.allocation.disk_limit;
650 let memory_limit = location.allocation.memory_limit;
651 let mut memory_request = None;
652
653 if location.allocation.swap_enabled {
654 disk_limit = Some(DiskLimit::ZERO);
658
659 memory_request = memory_limit.map(|MemoryLimit(limit)| {
663 let request = ByteSize::b(limit.as_u64() - 1);
664 MemoryLimit(request)
665 });
666 }
667
668 let service = self.orchestrator.ensure_service(
669 &service_name,
670 ServiceConfig {
671 image: self.clusterd_image.clone(),
672 init_container_image: self.init_container_image.clone(),
673 args: Box::new(move |assigned| {
674 let storage_timely_config = TimelyConfig {
675 workers: location.allocation.workers,
676 addresses: assigned.peer_addresses("storage"),
677 ..storage_proto_timely_config
678 };
679 let compute_timely_config = TimelyConfig {
680 workers: location.allocation.workers,
681 addresses: assigned.peer_addresses("compute"),
682 ..compute_proto_timely_config
683 };
684
685 let mut args = vec![
686 format!(
687 "--storage-controller-listen-addr={}",
688 assigned.listen_addrs["storagectl"]
689 ),
690 format!(
691 "--compute-controller-listen-addr={}",
692 assigned.listen_addrs["computectl"]
693 ),
694 format!(
695 "--internal-http-listen-addr={}",
696 assigned.listen_addrs["internal-http"]
697 ),
698 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
699 format!("--opentelemetry-resource=replica_id={}", replica_id),
700 format!("--persist-pubsub-url={}", persist_pubsub_url),
701 format!("--environment-id={}", environment_id),
702 format!(
703 "--storage-timely-config={}",
704 storage_timely_config.to_string(),
705 ),
706 format!(
707 "--compute-timely-config={}",
708 compute_timely_config.to_string(),
709 ),
710 ];
711 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
712 args.push(format!(
713 "--aws-external-id-prefix={}",
714 aws_external_id_prefix
715 ));
716 }
717 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
718 args.push(format!(
719 "--aws-connection-role-arn={}",
720 aws_connection_role_arn
721 ));
722 }
723 if let Some(memory_limit) = location.allocation.memory_limit {
724 args.push(format!(
725 "--announce-memory-limit={}",
726 memory_limit.0.as_u64()
727 ));
728 }
729 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
730 args.push("--worker-core-affinity".into());
731 }
732 if location.allocation.is_cc {
733 args.push("--is-cc".into());
734 }
735
736 if enable_ctp {
737 args.push("--use-ctp".into());
738 }
739
740 args.extend(secrets_args.clone());
741 args
742 }),
743 ports: vec![
744 ServicePort {
745 name: "storagectl".into(),
746 port_hint: 2100,
747 },
748 ServicePort {
752 name: "storage".into(),
753 port_hint: 2103,
754 },
755 ServicePort {
756 name: "computectl".into(),
757 port_hint: 2101,
758 },
759 ServicePort {
760 name: "compute".into(),
761 port_hint: 2102,
762 },
763 ServicePort {
764 name: "internal-http".into(),
765 port_hint: 6878,
766 },
767 ],
768 cpu_limit: location.allocation.cpu_limit,
769 memory_limit,
770 memory_request,
771 scale: location.allocation.scale,
772 labels: BTreeMap::from([
773 ("replica-id".into(), replica_id.to_string()),
774 ("cluster-id".into(), cluster_id.to_string()),
775 ("type".into(), "cluster".into()),
776 ("replica-role".into(), role_label.into()),
777 ("workers".into(), location.allocation.workers.to_string()),
778 (
779 "size".into(),
780 location
781 .size
782 .to_string()
783 .replace("=", "-")
784 .replace(",", "_"),
785 ),
786 ]),
787 annotations: BTreeMap::from([
788 ("replica-name".into(), replica_name),
789 ("cluster-name".into(), cluster_name),
790 ]),
791 availability_zones: match location.availability_zones {
792 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
793 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
794 },
795 other_replicas_selector: vec![
799 LabelSelector {
800 label_name: "cluster-id".to_string(),
801 logic: LabelSelectionLogic::Eq {
802 value: cluster_id.to_string(),
803 },
804 },
805 LabelSelector {
807 label_name: "replica-id".into(),
808 logic: LabelSelectionLogic::NotEq {
809 value: replica_id.to_string(),
810 },
811 },
812 ],
813 replicas_selector: vec![LabelSelector {
814 label_name: "cluster-id".to_string(),
815 logic: LabelSelectionLogic::Eq {
817 value: cluster_id.to_string(),
818 },
819 }],
820 disk_limit,
821 node_selector: location.allocation.selectors,
822 },
823 )?;
824
825 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
826 let tx = self.metrics_tx.clone();
827 let orchestrator = Arc::clone(&self.orchestrator);
828 let service_name = service_name.clone();
829 async move {
830 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
831
832 let mut interval = tokio::time::interval(METRICS_INTERVAL);
840 loop {
841 interval.tick().await;
842 match orchestrator.fetch_service_metrics(&service_name).await {
843 Ok(metrics) => {
844 let _ = tx.send((replica_id, metrics));
845 }
846 Err(e) => {
847 warn!("failed to get metrics for replica {replica_id}: {e}");
848 }
849 }
850 }
851 }
852 });
853
854 Ok((service, metrics_task.abort_on_drop()))
855 }
856
857 fn deprovision_replica(
859 &self,
860 cluster_id: ClusterId,
861 replica_id: ReplicaId,
862 generation: u64,
863 ) -> Result<(), anyhow::Error> {
864 let service_name = ReplicaServiceName {
865 cluster_id,
866 replica_id,
867 generation,
868 }
869 .to_string();
870 self.orchestrator.drop_service(&service_name)
871 }
872}
873
874async fn try_remove_past_generation_replicas(
876 orchestrator: &dyn NamespacedOrchestrator,
877 deploy_generation: u64,
878) -> Result<(), anyhow::Error> {
879 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
880
881 for service in services {
882 let name: ReplicaServiceName = service.parse()?;
883 if name.generation < deploy_generation {
884 info!(
885 cluster_id = %name.cluster_id,
886 replica_id = %name.replica_id,
887 "removing past generation replica",
888 );
889 orchestrator.drop_service(&service)?;
890 }
891 }
892
893 Ok(())
894}
895
896#[derive(PartialEq, Eq, PartialOrd, Ord)]
898pub struct ReplicaServiceName {
899 pub cluster_id: ClusterId,
900 pub replica_id: ReplicaId,
901 pub generation: u64,
902}
903
904impl fmt::Display for ReplicaServiceName {
905 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
906 let ReplicaServiceName {
907 cluster_id,
908 replica_id,
909 generation,
910 } = self;
911 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
912 }
913}
914
915impl FromStr for ReplicaServiceName {
916 type Err = anyhow::Error;
917
918 fn from_str(s: &str) -> Result<Self, Self::Err> {
919 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
920 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
921 });
922
923 let caps = SERVICE_NAME_RE
924 .captures(s)
925 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
926
927 Ok(ReplicaServiceName {
928 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
929 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
930 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
934 })
935 }
936}