1use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytesize::ByteSize;
21use chrono::{DateTime, Utc};
22use futures::stream::{BoxStream, StreamExt};
23use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
24use mz_compute_client::controller::ComputeControllerTimestamp;
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28 ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29 ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34 CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35 ServiceEvent, ServicePort,
36};
37use mz_ore::halt;
38use mz_ore::instrument;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49pub struct ClusterConfig {
51 pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56 pub workload_class: Option<String>,
59}
60
61pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67 pub location: ReplicaLocation,
69 pub compute: ComputeReplicaConfig,
71}
72
73#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76 pub memory_limit: Option<MemoryLimit>,
78 pub cpu_limit: Option<CpuLimit>,
80 pub disk_limit: Option<DiskLimit>,
82 pub scale: u16,
84 pub workers: usize,
86 #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
88 pub credits_per_hour: Numeric,
89 #[serde(default)]
91 pub cpu_exclusive: bool,
92 #[serde(default = "default_true")]
95 pub is_cc: bool,
96 #[serde(default)]
98 pub swap_enabled: bool,
99 #[serde(default)]
101 pub disabled: bool,
102 #[serde(default)]
104 pub selectors: BTreeMap<String, String>,
105}
106
107fn default_true() -> bool {
108 true
109}
110
111#[mz_ore::test]
112#[cfg_attr(miri, ignore)] fn test_replica_allocation_deserialization() {
115 use bytesize::ByteSize;
116
117 let data = r#"
118 {
119 "cpu_limit": 1.0,
120 "memory_limit": "10GiB",
121 "disk_limit": "100MiB",
122 "scale": 16,
123 "workers": 1,
124 "credits_per_hour": "16",
125 "swap_enabled": true,
126 "selectors": {
127 "key1": "value1",
128 "key2": "value2"
129 }
130 }"#;
131
132 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
133 .expect("deserialization from JSON succeeds for ReplicaAllocation");
134
135 assert_eq!(
136 replica_allocation,
137 ReplicaAllocation {
138 credits_per_hour: 16.into(),
139 disk_limit: Some(DiskLimit(ByteSize::mib(100))),
140 disabled: false,
141 memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
142 cpu_limit: Some(CpuLimit::from_millicpus(1000)),
143 cpu_exclusive: false,
144 is_cc: true,
145 swap_enabled: true,
146 scale: 16,
147 workers: 1,
148 selectors: BTreeMap::from([
149 ("key1".to_string(), "value1".to_string()),
150 ("key2".to_string(), "value2".to_string())
151 ]),
152 }
153 );
154
155 let data = r#"
156 {
157 "cpu_limit": 0,
158 "memory_limit": "0GiB",
159 "disk_limit": "0MiB",
160 "scale": 0,
161 "workers": 0,
162 "credits_per_hour": "0",
163 "cpu_exclusive": true,
164 "disabled": true
165 }"#;
166
167 let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
168 .expect("deserialization from JSON succeeds for ReplicaAllocation");
169
170 assert_eq!(
171 replica_allocation,
172 ReplicaAllocation {
173 credits_per_hour: 0.into(),
174 disk_limit: Some(DiskLimit(ByteSize::mib(0))),
175 disabled: true,
176 memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
177 cpu_limit: Some(CpuLimit::from_millicpus(0)),
178 cpu_exclusive: true,
179 is_cc: true,
180 swap_enabled: false,
181 scale: 0,
182 workers: 0,
183 selectors: Default::default(),
184 }
185 );
186}
187
188#[derive(Clone, Debug, Serialize, PartialEq)]
190pub enum ReplicaLocation {
191 Unmanaged(UnmanagedReplicaLocation),
193 Managed(ManagedReplicaLocation),
195}
196
197impl ReplicaLocation {
198 pub fn num_processes(&self) -> usize {
200 match self {
201 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
202 computectl_addrs, ..
203 }) => computectl_addrs.len(),
204 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
205 allocation.scale.into()
206 }
207 }
208 }
209
210 pub fn billed_as(&self) -> Option<&str> {
211 match self {
212 ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
213 billed_as.as_deref()
214 }
215 _ => None,
216 }
217 }
218
219 pub fn internal(&self) -> bool {
220 match self {
221 ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
222 ReplicaLocation::Unmanaged(_) => false,
223 }
224 }
225
226 pub fn workers(&self) -> Option<usize> {
230 match self {
231 ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
232 Some(allocation.workers * self.num_processes())
233 }
234 ReplicaLocation::Unmanaged(_) => None,
235 }
236 }
237
238 pub fn pending(&self) -> bool {
243 match self {
244 ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
245 _ => false,
246 }
247 }
248}
249
250pub enum ClusterRole {
253 SystemCritical,
256 System,
260 User,
263}
264
265#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
267pub struct UnmanagedReplicaLocation {
268 pub storagectl_addrs: Vec<String>,
271 pub computectl_addrs: Vec<String>,
274}
275
276#[derive(Clone, Debug, PartialEq, Eq)]
278pub enum ManagedReplicaAvailabilityZones {
279 FromCluster(Option<Vec<String>>),
283 FromReplica(Option<String>),
286}
287
288#[derive(Clone, Debug, Serialize, PartialEq)]
290pub struct ManagedReplicaLocation {
291 pub allocation: ReplicaAllocation,
293 pub size: String,
295 pub internal: bool,
297 pub billed_as: Option<String>,
299 #[serde(skip)]
313 pub availability_zones: ManagedReplicaAvailabilityZones,
314 pub pending: bool,
316}
317
318impl ManagedReplicaLocation {
319 pub fn size_for_billing(&self) -> &str {
321 self.billed_as.as_deref().unwrap_or(&self.size)
322 }
323}
324
325pub type ReplicaLogging = ComputeReplicaLogging;
327
328pub type ProcessId = u64;
330
331#[derive(Debug, Clone, Serialize)]
333pub struct ClusterEvent {
334 pub cluster_id: ClusterId,
335 pub replica_id: ReplicaId,
336 pub process_id: ProcessId,
337 pub status: ClusterStatus,
338 pub time: DateTime<Utc>,
339}
340
341impl<T> Controller<T>
342where
343 T: ComputeControllerTimestamp,
344{
345 pub fn create_cluster(
351 &mut self,
352 id: ClusterId,
353 config: ClusterConfig,
354 ) -> Result<(), anyhow::Error> {
355 self.storage
356 .create_instance(id, config.workload_class.clone());
357 self.compute
358 .create_instance(id, config.arranged_logs, config.workload_class)?;
359 Ok(())
360 }
361
362 pub fn update_cluster_workload_class(
364 &mut self,
365 id: ClusterId,
366 workload_class: Option<String>,
367 ) -> Result<(), anyhow::Error> {
368 self.storage
369 .update_instance_workload_class(id, workload_class.clone());
370 self.compute
371 .update_instance_workload_class(id, workload_class)?;
372 Ok(())
373 }
374
375 pub fn drop_cluster(&mut self, id: ClusterId) {
381 self.storage.drop_instance(id);
382 self.compute.drop_instance(id);
383 }
384
385 pub fn create_replica(
388 &mut self,
389 cluster_id: ClusterId,
390 replica_id: ReplicaId,
391 cluster_name: String,
392 replica_name: String,
393 role: ClusterRole,
394 config: ReplicaConfig,
395 enable_worker_core_affinity: bool,
396 ) -> Result<(), anyhow::Error> {
397 let storage_location: ClusterReplicaLocation;
398 let compute_location: ClusterReplicaLocation;
399 let metrics_task: Option<AbortOnDropHandle<()>>;
400
401 match config.location {
402 ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
403 storagectl_addrs,
404 computectl_addrs,
405 }) => {
406 compute_location = ClusterReplicaLocation {
407 ctl_addrs: computectl_addrs,
408 };
409 storage_location = ClusterReplicaLocation {
410 ctl_addrs: storagectl_addrs,
411 };
412 metrics_task = None;
413 }
414 ReplicaLocation::Managed(m) => {
415 let (service, metrics_task_join_handle) = self.provision_replica(
416 cluster_id,
417 replica_id,
418 cluster_name,
419 replica_name,
420 role,
421 m,
422 enable_worker_core_affinity,
423 )?;
424 storage_location = ClusterReplicaLocation {
425 ctl_addrs: service.addresses("storagectl"),
426 };
427 compute_location = ClusterReplicaLocation {
428 ctl_addrs: service.addresses("computectl"),
429 };
430 metrics_task = Some(metrics_task_join_handle);
431 }
432 }
433
434 self.storage
435 .connect_replica(cluster_id, replica_id, storage_location);
436 self.compute.add_replica_to_instance(
437 cluster_id,
438 replica_id,
439 compute_location,
440 config.compute,
441 )?;
442
443 if let Some(task) = metrics_task {
444 self.metrics_tasks.insert(replica_id, task);
445 }
446
447 Ok(())
448 }
449
450 pub fn drop_replica(
452 &mut self,
453 cluster_id: ClusterId,
454 replica_id: ReplicaId,
455 ) -> Result<(), anyhow::Error> {
456 self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
461 self.metrics_tasks.remove(&replica_id);
462
463 self.compute.drop_replica(cluster_id, replica_id)?;
464 self.storage.drop_replica(cluster_id, replica_id);
465 Ok(())
466 }
467
468 pub(crate) fn remove_past_generation_replicas_in_background(&self) {
470 let deploy_generation = self.deploy_generation;
471 let dyncfg = Arc::clone(self.compute.dyncfg());
472 let orchestrator = Arc::clone(&self.orchestrator);
473 task::spawn(
474 || "controller_remove_past_generation_replicas",
475 async move {
476 info!("attempting to remove past generation replicas");
477 loop {
478 match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
479 .await
480 {
481 Ok(()) => {
482 info!("successfully removed past generation replicas");
483 return;
484 }
485 Err(e) => {
486 let interval =
487 CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
488 .get(&dyncfg);
489 warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
490 time::sleep(interval).await;
491 }
492 }
493 }
494 },
495 );
496 }
497
498 #[instrument]
500 pub async fn remove_orphaned_replicas(
501 &mut self,
502 next_user_replica_id: u64,
503 next_system_replica_id: u64,
504 ) -> Result<(), anyhow::Error> {
505 let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
506
507 let actual: BTreeSet<_> = self
508 .orchestrator
509 .list_services()
510 .await?
511 .iter()
512 .map(|s| ReplicaServiceName::from_str(s))
513 .collect::<Result<_, _>>()?;
514
515 for ReplicaServiceName {
516 cluster_id,
517 replica_id,
518 generation,
519 } in actual
520 {
521 if generation != self.deploy_generation {
525 continue;
526 }
527
528 let smaller_next = match replica_id {
529 ReplicaId::User(id) if id >= next_user_replica_id => {
530 Some(ReplicaId::User(next_user_replica_id))
531 }
532 ReplicaId::System(id) if id >= next_system_replica_id => {
533 Some(ReplicaId::System(next_system_replica_id))
534 }
535 _ => None,
536 };
537 if let Some(next) = smaller_next {
538 halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
543 }
544 if !desired.contains(&replica_id) {
545 self.deprovision_replica(cluster_id, replica_id, generation)?;
546 }
547 }
548
549 Ok(())
550 }
551
552 pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
553 let deploy_generation = self.deploy_generation;
554
555 fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
556 let ReplicaServiceName {
557 cluster_id,
558 replica_id,
559 generation: replica_generation,
560 ..
561 } = event.service_id.parse()?;
562
563 let event = ClusterEvent {
564 cluster_id,
565 replica_id,
566 process_id: event.process_id,
567 status: event.status,
568 time: event.time,
569 };
570
571 Ok((event, replica_generation))
572 }
573
574 let stream = self
575 .orchestrator
576 .watch_services()
577 .map(|event| event.and_then(translate_event))
578 .filter_map(move |event| async move {
579 match event {
580 Ok((event, replica_generation)) => {
581 if replica_generation == deploy_generation {
582 Some(event)
583 } else {
584 None
585 }
586 }
587 Err(error) => {
588 error!("service watch error: {error}");
589 None
590 }
591 }
592 });
593
594 Box::pin(stream)
595 }
596
597 fn provision_replica(
599 &self,
600 cluster_id: ClusterId,
601 replica_id: ReplicaId,
602 cluster_name: String,
603 replica_name: String,
604 role: ClusterRole,
605 location: ManagedReplicaLocation,
606 enable_worker_core_affinity: bool,
607 ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
608 let service_name = ReplicaServiceName {
609 cluster_id,
610 replica_id,
611 generation: self.deploy_generation,
612 }
613 .to_string();
614 let role_label = match role {
615 ClusterRole::SystemCritical => "system-critical",
616 ClusterRole::System => "system",
617 ClusterRole::User => "user",
618 };
619 let environment_id = self.connection_context().environment_id.clone();
620 let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
621 let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
622 let persist_pubsub_url = self.persist_pubsub_url.clone();
623 let secrets_args = self.secrets_args.to_flags();
624
625 let storage_proto_timely_config = TimelyConfig {
627 arrangement_exert_proportionality: 1337,
628 ..Default::default()
629 };
630 let compute_proto_timely_config = TimelyConfig {
631 arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
632 enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
633 enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
634 zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
635 ..Default::default()
636 };
637
638 let mut disk_limit = location.allocation.disk_limit;
639 let memory_limit = location.allocation.memory_limit;
640 let mut memory_request = None;
641
642 if location.allocation.swap_enabled {
643 disk_limit = Some(DiskLimit::ZERO);
647
648 memory_request = memory_limit.map(|MemoryLimit(limit)| {
652 let request = ByteSize::b(limit.as_u64() - 1);
653 MemoryLimit(request)
654 });
655 }
656
657 let service = self.orchestrator.ensure_service(
658 &service_name,
659 ServiceConfig {
660 image: self.clusterd_image.clone(),
661 init_container_image: self.init_container_image.clone(),
662 args: Box::new(move |assigned| {
663 let storage_timely_config = TimelyConfig {
664 workers: location.allocation.workers,
665 addresses: assigned.peer_addresses("storage"),
666 ..storage_proto_timely_config
667 };
668 let compute_timely_config = TimelyConfig {
669 workers: location.allocation.workers,
670 addresses: assigned.peer_addresses("compute"),
671 ..compute_proto_timely_config
672 };
673
674 let mut args = vec![
675 format!(
676 "--storage-controller-listen-addr={}",
677 assigned.listen_addrs["storagectl"]
678 ),
679 format!(
680 "--compute-controller-listen-addr={}",
681 assigned.listen_addrs["computectl"]
682 ),
683 format!(
684 "--internal-http-listen-addr={}",
685 assigned.listen_addrs["internal-http"]
686 ),
687 format!("--opentelemetry-resource=cluster_id={}", cluster_id),
688 format!("--opentelemetry-resource=replica_id={}", replica_id),
689 format!("--persist-pubsub-url={}", persist_pubsub_url),
690 format!("--environment-id={}", environment_id),
691 format!(
692 "--storage-timely-config={}",
693 storage_timely_config.to_string(),
694 ),
695 format!(
696 "--compute-timely-config={}",
697 compute_timely_config.to_string(),
698 ),
699 ];
700 if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
701 args.push(format!(
702 "--aws-external-id-prefix={}",
703 aws_external_id_prefix
704 ));
705 }
706 if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
707 args.push(format!(
708 "--aws-connection-role-arn={}",
709 aws_connection_role_arn
710 ));
711 }
712 if let Some(memory_limit) = location.allocation.memory_limit {
713 args.push(format!(
714 "--announce-memory-limit={}",
715 memory_limit.0.as_u64()
716 ));
717 }
718 if location.allocation.cpu_exclusive && enable_worker_core_affinity {
719 args.push("--worker-core-affinity".into());
720 }
721 if location.allocation.is_cc {
722 args.push("--is-cc".into());
723 }
724
725 if location.allocation.swap_enabled
728 && let Some(memory_limit) = location.allocation.memory_limit
729 && let Some(disk_limit) = location.allocation.disk_limit
730 && disk_limit != DiskLimit::ZERO
734 {
735 let heap_limit = memory_limit.0 + disk_limit.0;
736 args.push(format!("--heap-limit={}", heap_limit.as_u64()));
737 }
738
739 args.extend(secrets_args.clone());
740 args
741 }),
742 ports: vec![
743 ServicePort {
744 name: "storagectl".into(),
745 port_hint: 2100,
746 },
747 ServicePort {
751 name: "storage".into(),
752 port_hint: 2103,
753 },
754 ServicePort {
755 name: "computectl".into(),
756 port_hint: 2101,
757 },
758 ServicePort {
759 name: "compute".into(),
760 port_hint: 2102,
761 },
762 ServicePort {
763 name: "internal-http".into(),
764 port_hint: 6878,
765 },
766 ],
767 cpu_limit: location.allocation.cpu_limit,
768 memory_limit,
769 memory_request,
770 scale: location.allocation.scale,
771 labels: BTreeMap::from([
772 ("replica-id".into(), replica_id.to_string()),
773 ("cluster-id".into(), cluster_id.to_string()),
774 ("type".into(), "cluster".into()),
775 ("replica-role".into(), role_label.into()),
776 ("workers".into(), location.allocation.workers.to_string()),
777 (
778 "size".into(),
779 location
780 .size
781 .to_string()
782 .replace("=", "-")
783 .replace(",", "_"),
784 ),
785 ]),
786 annotations: BTreeMap::from([
787 ("replica-name".into(), replica_name),
788 ("cluster-name".into(), cluster_name),
789 ]),
790 availability_zones: match location.availability_zones {
791 ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
792 ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
793 },
794 other_replicas_selector: vec![
798 LabelSelector {
799 label_name: "cluster-id".to_string(),
800 logic: LabelSelectionLogic::Eq {
801 value: cluster_id.to_string(),
802 },
803 },
804 LabelSelector {
806 label_name: "replica-id".into(),
807 logic: LabelSelectionLogic::NotEq {
808 value: replica_id.to_string(),
809 },
810 },
811 ],
812 replicas_selector: vec![LabelSelector {
813 label_name: "cluster-id".to_string(),
814 logic: LabelSelectionLogic::Eq {
816 value: cluster_id.to_string(),
817 },
818 }],
819 disk_limit,
820 node_selector: location.allocation.selectors,
821 },
822 )?;
823
824 let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
825 let tx = self.metrics_tx.clone();
826 let orchestrator = Arc::clone(&self.orchestrator);
827 let service_name = service_name.clone();
828 async move {
829 const METRICS_INTERVAL: Duration = Duration::from_secs(60);
830
831 let mut interval = tokio::time::interval(METRICS_INTERVAL);
839 loop {
840 interval.tick().await;
841 match orchestrator.fetch_service_metrics(&service_name).await {
842 Ok(metrics) => {
843 let _ = tx.send((replica_id, metrics));
844 }
845 Err(e) => {
846 warn!("failed to get metrics for replica {replica_id}: {e}");
847 }
848 }
849 }
850 }
851 });
852
853 Ok((service, metrics_task.abort_on_drop()))
854 }
855
856 fn deprovision_replica(
858 &self,
859 cluster_id: ClusterId,
860 replica_id: ReplicaId,
861 generation: u64,
862 ) -> Result<(), anyhow::Error> {
863 let service_name = ReplicaServiceName {
864 cluster_id,
865 replica_id,
866 generation,
867 }
868 .to_string();
869 self.orchestrator.drop_service(&service_name)
870 }
871}
872
873async fn try_remove_past_generation_replicas(
875 orchestrator: &dyn NamespacedOrchestrator,
876 deploy_generation: u64,
877) -> Result<(), anyhow::Error> {
878 let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
879
880 for service in services {
881 let name: ReplicaServiceName = service.parse()?;
882 if name.generation < deploy_generation {
883 info!(
884 cluster_id = %name.cluster_id,
885 replica_id = %name.replica_id,
886 "removing past generation replica",
887 );
888 orchestrator.drop_service(&service)?;
889 }
890 }
891
892 Ok(())
893}
894
895#[derive(PartialEq, Eq, PartialOrd, Ord)]
897pub struct ReplicaServiceName {
898 pub cluster_id: ClusterId,
899 pub replica_id: ReplicaId,
900 pub generation: u64,
901}
902
903impl fmt::Display for ReplicaServiceName {
904 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
905 let ReplicaServiceName {
906 cluster_id,
907 replica_id,
908 generation,
909 } = self;
910 write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
911 }
912}
913
914impl FromStr for ReplicaServiceName {
915 type Err = anyhow::Error;
916
917 fn from_str(s: &str) -> Result<Self, Self::Err> {
918 static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
919 Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
920 });
921
922 let caps = SERVICE_NAME_RE
923 .captures(s)
924 .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
925
926 Ok(ReplicaServiceName {
927 cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
928 replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
929 generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
933 })
934 }
935}