mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use chrono::{DateTime, Utc};
21use futures::stream::{BoxStream, StreamExt};
22use mz_cluster_client::client::ClusterReplicaLocation;
23use mz_compute_client::controller::ComputeControllerTimestamp;
24use mz_compute_client::logging::LogVariant;
25use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL;
28use mz_controller_types::{ClusterId, ReplicaId};
29use mz_orchestrator::NamespacedOrchestrator;
30use mz_orchestrator::{
31    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
32    ServiceEvent, ServicePort,
33};
34use mz_ore::halt;
35use mz_ore::instrument;
36use mz_ore::task::{self, AbortOnDropHandle};
37use mz_repr::GlobalId;
38use mz_repr::adt::numeric::Numeric;
39use regex::Regex;
40use serde::{Deserialize, Serialize};
41use tokio::time;
42use tracing::{error, info, warn};
43
44use crate::Controller;
45
46/// Configures a cluster.
47pub struct ClusterConfig {
48    /// The logging variants to enable on the compute instance.
49    ///
50    /// Each logging variant is mapped to the identifier under which to register
51    /// the arrangement storing the log's data.
52    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
53    /// An optional arbitrary string that describes the class of the workload
54    /// this cluster is running (e.g., `production` or `staging`).
55    pub workload_class: Option<String>,
56}
57
58/// The status of a cluster.
59pub type ClusterStatus = mz_orchestrator::ServiceStatus;
60
61/// Configures a cluster replica.
62#[derive(Clone, Debug, Serialize, PartialEq)]
63pub struct ReplicaConfig {
64    /// The location of the replica.
65    pub location: ReplicaLocation,
66    /// Configuration for the compute half of the replica.
67    pub compute: ComputeReplicaConfig,
68}
69
70/// Configures the resource allocation for a cluster replica.
71#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
72pub struct ReplicaAllocation {
73    /// The memory limit for each process in the replica.
74    pub memory_limit: Option<MemoryLimit>,
75    /// The CPU limit for each process in the replica.
76    pub cpu_limit: Option<CpuLimit>,
77    /// The disk limit for each process in the replica.
78    pub disk_limit: Option<DiskLimit>,
79    /// The number of processes in the replica.
80    pub scale: u16,
81    /// The number of worker threads in the replica.
82    pub workers: usize,
83    /// The number of credits per hour that the replica consumes.
84    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
85    pub credits_per_hour: Numeric,
86    /// Whether each process has exclusive access to its CPU cores.
87    #[serde(default)]
88    pub cpu_exclusive: bool,
89    /// Whether this size represents a modern "cc" size rather than a legacy
90    /// T-shirt size.
91    #[serde(default = "default_true")]
92    pub is_cc: bool,
93    /// Whether instances of this type can be created.
94    #[serde(default)]
95    pub disabled: bool,
96    /// Additional node selectors.
97    #[serde(default)]
98    pub selectors: BTreeMap<String, String>,
99}
100
101fn default_true() -> bool {
102    true
103}
104
105#[mz_ore::test]
106// We test this particularly because we deserialize values from strings.
107#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
108fn test_replica_allocation_deserialization() {
109    use bytesize::ByteSize;
110
111    let data = r#"
112        {
113            "cpu_limit": 1.0,
114            "memory_limit": "10GiB",
115            "disk_limit": "100MiB",
116            "scale": 16,
117            "workers": 1,
118            "credits_per_hour": "16",
119            "selectors": {
120                "key1": "value1",
121                "key2": "value2"
122            }
123        }"#;
124
125    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
126        .expect("deserialization from JSON succeeds for ReplicaAllocation");
127
128    assert_eq!(
129        replica_allocation,
130        ReplicaAllocation {
131            credits_per_hour: 16.into(),
132            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
133            disabled: false,
134            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
135            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
136            cpu_exclusive: false,
137            is_cc: true,
138            scale: 16,
139            workers: 1,
140            selectors: BTreeMap::from([
141                ("key1".to_string(), "value1".to_string()),
142                ("key2".to_string(), "value2".to_string())
143            ]),
144        }
145    );
146
147    let data = r#"
148        {
149            "cpu_limit": 0,
150            "memory_limit": "0GiB",
151            "disk_limit": "0MiB",
152            "scale": 0,
153            "workers": 0,
154            "credits_per_hour": "0",
155            "cpu_exclusive": true,
156            "disabled": true
157        }"#;
158
159    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
160        .expect("deserialization from JSON succeeds for ReplicaAllocation");
161
162    assert_eq!(
163        replica_allocation,
164        ReplicaAllocation {
165            credits_per_hour: 0.into(),
166            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
167            disabled: true,
168            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
169            cpu_limit: Some(CpuLimit::from_millicpus(0)),
170            cpu_exclusive: true,
171            is_cc: true,
172            scale: 0,
173            workers: 0,
174            selectors: Default::default(),
175        }
176    );
177}
178
179/// Configures the location of a cluster replica.
180#[derive(Clone, Debug, Serialize, PartialEq)]
181pub enum ReplicaLocation {
182    /// An unmanaged replica.
183    Unmanaged(UnmanagedReplicaLocation),
184    /// A managed replica.
185    Managed(ManagedReplicaLocation),
186}
187
188impl ReplicaLocation {
189    /// Returns the number of processes specified by this replica location.
190    pub fn num_processes(&self) -> usize {
191        match self {
192            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
193                computectl_addrs, ..
194            }) => computectl_addrs.len(),
195            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
196                allocation.scale.into()
197            }
198        }
199    }
200
201    pub fn billed_as(&self) -> Option<&str> {
202        match self {
203            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
204                billed_as.as_deref()
205            }
206            _ => None,
207        }
208    }
209
210    pub fn internal(&self) -> bool {
211        match self {
212            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
213            ReplicaLocation::Unmanaged(_) => false,
214        }
215    }
216
217    pub fn workers(&self) -> usize {
218        let workers_per_process = match self {
219            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
220                allocation.workers
221            }
222            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation { workers, .. }) => *workers,
223        };
224        workers_per_process * self.num_processes()
225    }
226
227    /// A pending replica is created as part of an alter cluster of an managed
228    /// cluster. the configuration of a pending replica will not match that of
229    /// the clusters until the alter has been finalized promoting the pending
230    /// replicas and setting this value to false.
231    pub fn pending(&self) -> bool {
232        match self {
233            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
234            _ => false,
235        }
236    }
237}
238
239/// The "role" of a cluster, which is currently used to determine the
240/// severity of alerts for problems with its replicas.
241pub enum ClusterRole {
242    /// The existence and proper functioning of the cluster's replicas is
243    /// business-critical for Materialize.
244    SystemCritical,
245    /// Assuming no bugs, the cluster's replicas should always exist and function
246    /// properly. If it doesn't, however, that is less urgent than
247    /// would be the case for a `SystemCritical` replica.
248    System,
249    /// The cluster is controlled by the user, and might go down for
250    /// reasons outside our control (e.g., OOMs).
251    User,
252}
253
254/// The location of an unmanaged replica.
255#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
256pub struct UnmanagedReplicaLocation {
257    /// The network addresses of the storagectl endpoints for each process in
258    /// the replica.
259    pub storagectl_addrs: Vec<String>,
260    /// The network addresses of the storage (Timely) endpoints for
261    /// each process in the replica.
262    pub storage_addrs: Vec<String>,
263    /// The network addresses of the computectl endpoints for each process in
264    /// the replica.
265    pub computectl_addrs: Vec<String>,
266    /// The network addresses of the compute (Timely) endpoints for
267    /// each process in the replica.
268    pub compute_addrs: Vec<String>,
269    /// The workers per process in the replica.
270    pub workers: usize,
271}
272
273/// Information about availability zone constraints for replicas.
274#[derive(Clone, Debug, PartialEq, Eq)]
275pub enum ManagedReplicaAvailabilityZones {
276    /// Specified if the `Replica` is from `MANAGED` cluster,
277    /// and specifies if there is an `AVAILABILITY ZONES`
278    /// constraint. Empty lists are represented as `None`.
279    FromCluster(Option<Vec<String>>),
280    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
281    /// and specifies if there is a specific `AVAILABILITY ZONE`.
282    FromReplica(Option<String>),
283}
284
285/// The location of a managed replica.
286#[derive(Clone, Debug, Serialize, PartialEq)]
287pub struct ManagedReplicaLocation {
288    /// The resource allocation for the replica.
289    pub allocation: ReplicaAllocation,
290    /// SQL size parameter used for allocation
291    pub size: String,
292    /// If `true`, Materialize support owns this replica.
293    pub internal: bool,
294    /// Optional SQL size parameter used for billing.
295    pub billed_as: Option<String>,
296    /// The replica's availability zones, if specified.
297    ///
298    /// This is either the replica's specific `AVAILABILITY ZONE`,
299    /// or the zones placed here during replica concretization
300    /// from the `MANAGED` cluster config.
301    ///
302    /// We skip serialization (which is used for some validation
303    /// in tests) as the latter case is a "virtual" piece of information,
304    /// that exists only at runtime.
305    ///
306    /// An empty list of availability zones is concretized as `None`,
307    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
308    /// is an empty list if none are specified
309    #[serde(skip)]
310    pub availability_zones: ManagedReplicaAvailabilityZones,
311    /// Whether the replica needs scratch disk space.
312    pub disk: bool,
313    /// Whether the repelica is pending reconfiguration
314    pub pending: bool,
315}
316
317impl ManagedReplicaLocation {
318    /// Return the size which should be used to determine billing-related information.
319    pub fn size_for_billing(&self) -> &str {
320        self.billed_as.as_deref().unwrap_or(&self.size)
321    }
322}
323
324/// Configures logging for a cluster replica.
325pub type ReplicaLogging = ComputeReplicaLogging;
326
327/// Identifier of a process within a replica.
328pub type ProcessId = u64;
329
330/// An event describing a change in status of a cluster replica process.
331#[derive(Debug, Clone, Serialize)]
332pub struct ClusterEvent {
333    pub cluster_id: ClusterId,
334    pub replica_id: ReplicaId,
335    pub process_id: ProcessId,
336    pub status: ClusterStatus,
337    pub time: DateTime<Utc>,
338}
339
340impl<T> Controller<T>
341where
342    T: ComputeControllerTimestamp,
343    ComputeGrpcClient: ComputeClient<T>,
344{
345    /// Creates a cluster with the specified identifier and configuration.
346    ///
347    /// A cluster is a combination of a storage instance and a compute instance.
348    /// A cluster has zero or more replicas; each replica colocates the storage
349    /// and compute layers on the same physical resources.
350    pub fn create_cluster(
351        &mut self,
352        id: ClusterId,
353        config: ClusterConfig,
354    ) -> Result<(), anyhow::Error> {
355        self.storage.create_instance(id);
356        self.compute
357            .create_instance(id, config.arranged_logs, config.workload_class)?;
358        Ok(())
359    }
360
361    /// Updates the workload class for a cluster.
362    pub fn update_cluster_workload_class(
363        &mut self,
364        id: ClusterId,
365        workload_class: Option<String>,
366    ) -> Result<(), anyhow::Error> {
367        self.storage
368            .update_instance_workload_class(id, workload_class.clone());
369        self.compute
370            .update_instance_workload_class(id, workload_class)?;
371        Ok(())
372    }
373
374    /// Drops the specified cluster.
375    ///
376    /// # Panics
377    ///
378    /// Panics if the cluster still has replicas.
379    pub fn drop_cluster(&mut self, id: ClusterId) {
380        self.storage.drop_instance(id);
381        self.compute.drop_instance(id);
382    }
383
384    /// Creates a replica of the specified cluster with the specified identifier
385    /// and configuration.
386    pub fn create_replica(
387        &mut self,
388        cluster_id: ClusterId,
389        replica_id: ReplicaId,
390        role: ClusterRole,
391        config: ReplicaConfig,
392        enable_worker_core_affinity: bool,
393    ) -> Result<(), anyhow::Error> {
394        let storage_location: ClusterReplicaLocation;
395        let compute_location: ClusterReplicaLocation;
396        let metrics_task: Option<AbortOnDropHandle<()>>;
397
398        match config.location {
399            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
400                storagectl_addrs,
401                storage_addrs,
402                computectl_addrs,
403                compute_addrs,
404                workers,
405            }) => {
406                compute_location = ClusterReplicaLocation {
407                    ctl_addrs: computectl_addrs,
408                    dataflow_addrs: compute_addrs,
409                    workers,
410                };
411                storage_location = ClusterReplicaLocation {
412                    ctl_addrs: storagectl_addrs,
413                    dataflow_addrs: storage_addrs,
414                    // Storage and compute on the same replica have linked sizes.
415                    workers,
416                };
417                metrics_task = None;
418            }
419            ReplicaLocation::Managed(m) => {
420                let workers = m.allocation.workers;
421                let (service, metrics_task_join_handle) = self.provision_replica(
422                    cluster_id,
423                    replica_id,
424                    role,
425                    m,
426                    enable_worker_core_affinity,
427                )?;
428                storage_location = ClusterReplicaLocation {
429                    ctl_addrs: service.addresses("storagectl"),
430                    dataflow_addrs: service.addresses("storage"),
431                    workers,
432                };
433                compute_location = ClusterReplicaLocation {
434                    ctl_addrs: service.addresses("computectl"),
435                    dataflow_addrs: service.addresses("compute"),
436                    workers,
437                };
438                metrics_task = Some(metrics_task_join_handle);
439            }
440        }
441
442        self.storage
443            .connect_replica(cluster_id, replica_id, storage_location);
444        self.compute.add_replica_to_instance(
445            cluster_id,
446            replica_id,
447            compute_location,
448            config.compute,
449        )?;
450
451        if let Some(task) = metrics_task {
452            self.metrics_tasks.insert(replica_id, task);
453        }
454
455        Ok(())
456    }
457
458    /// Drops the specified replica of the specified cluster.
459    pub fn drop_replica(
460        &mut self,
461        cluster_id: ClusterId,
462        replica_id: ReplicaId,
463    ) -> Result<(), anyhow::Error> {
464        // We unconditionally deprovision even for unmanaged replicas to avoid
465        // needing to keep track of which replicas are managed and which are
466        // unmanaged. Deprovisioning is a no-op if the replica ID was never
467        // provisioned.
468        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
469        self.metrics_tasks.remove(&replica_id);
470
471        self.compute.drop_replica(cluster_id, replica_id)?;
472        self.storage.drop_replica(cluster_id, replica_id);
473        Ok(())
474    }
475
476    /// Removes replicas from past generations in a background task.
477    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
478        let deploy_generation = self.deploy_generation;
479        let dyncfg = Arc::clone(self.compute.dyncfg());
480        let orchestrator = Arc::clone(&self.orchestrator);
481        task::spawn(
482            || "controller_remove_past_generation_replicas",
483            async move {
484                info!("attempting to remove past generation replicas");
485                loop {
486                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
487                        .await
488                    {
489                        Ok(()) => {
490                            info!("successfully removed past generation replicas");
491                            return;
492                        }
493                        Err(e) => {
494                            let interval =
495                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
496                                    .get(&dyncfg);
497                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
498                            time::sleep(interval).await;
499                        }
500                    }
501                }
502            },
503        );
504    }
505
506    /// Remove replicas that are orphaned in the current generation.
507    #[instrument]
508    pub async fn remove_orphaned_replicas(
509        &mut self,
510        next_user_replica_id: u64,
511        next_system_replica_id: u64,
512    ) -> Result<(), anyhow::Error> {
513        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
514
515        let actual: BTreeSet<_> = self
516            .orchestrator
517            .list_services()
518            .await?
519            .iter()
520            .map(|s| ReplicaServiceName::from_str(s))
521            .collect::<Result<_, _>>()?;
522
523        for ReplicaServiceName {
524            cluster_id,
525            replica_id,
526            generation,
527        } in actual
528        {
529            // We limit our attention here to replicas from the current deploy
530            // generation. Replicas from past generations are cleaned up during
531            // `Controller::allow_writes`.
532            if generation != self.deploy_generation {
533                continue;
534            }
535
536            let smaller_next = match replica_id {
537                ReplicaId::User(id) if id >= next_user_replica_id => {
538                    Some(ReplicaId::User(next_user_replica_id))
539                }
540                ReplicaId::System(id) if id >= next_system_replica_id => {
541                    Some(ReplicaId::System(next_system_replica_id))
542                }
543                _ => None,
544            };
545            if let Some(next) = smaller_next {
546                // Found a replica in the orchestrator with a higher replica ID
547                // than what we are aware of. This must have been created by an
548                // environmentd that's competing for control of this generation.
549                // Abort to let the other process have full control.
550                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
551            }
552            if !desired.contains(&replica_id) {
553                self.deprovision_replica(cluster_id, replica_id, generation)?;
554            }
555        }
556
557        Ok(())
558    }
559
560    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
561        let deploy_generation = self.deploy_generation;
562
563        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
564            let ReplicaServiceName {
565                cluster_id,
566                replica_id,
567                generation: replica_generation,
568                ..
569            } = event.service_id.parse()?;
570
571            let event = ClusterEvent {
572                cluster_id,
573                replica_id,
574                process_id: event.process_id,
575                status: event.status,
576                time: event.time,
577            };
578
579            Ok((event, replica_generation))
580        }
581
582        let stream = self
583            .orchestrator
584            .watch_services()
585            .map(|event| event.and_then(translate_event))
586            .filter_map(move |event| async move {
587                match event {
588                    Ok((event, replica_generation)) => {
589                        if replica_generation == deploy_generation {
590                            Some(event)
591                        } else {
592                            None
593                        }
594                    }
595                    Err(error) => {
596                        error!("service watch error: {error}");
597                        None
598                    }
599                }
600            });
601
602        Box::pin(stream)
603    }
604
605    /// Provisions a replica with the service orchestrator.
606    fn provision_replica(
607        &self,
608        cluster_id: ClusterId,
609        replica_id: ReplicaId,
610        role: ClusterRole,
611        location: ManagedReplicaLocation,
612        enable_worker_core_affinity: bool,
613    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
614        let service_name = ReplicaServiceName {
615            cluster_id,
616            replica_id,
617            generation: self.deploy_generation,
618        }
619        .to_string();
620        let role_label = match role {
621            ClusterRole::SystemCritical => "system-critical",
622            ClusterRole::System => "system",
623            ClusterRole::User => "user",
624        };
625        let environment_id = self.connection_context().environment_id.clone();
626        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
627        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
628        let persist_pubsub_url = self.persist_pubsub_url.clone();
629        let secrets_args = self.secrets_args.to_flags();
630        let service = self.orchestrator.ensure_service(
631            &service_name,
632            ServiceConfig {
633                image: self.clusterd_image.clone(),
634                init_container_image: self.init_container_image.clone(),
635                args: Box::new(move |assigned| {
636                    let mut args = vec![
637                        format!(
638                            "--storage-controller-listen-addr={}",
639                            assigned["storagectl"]
640                        ),
641                        format!(
642                            "--compute-controller-listen-addr={}",
643                            assigned["computectl"]
644                        ),
645                        format!("--internal-http-listen-addr={}", assigned["internal-http"]),
646                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
647                        format!("--opentelemetry-resource=replica_id={}", replica_id),
648                        format!("--persist-pubsub-url={}", persist_pubsub_url),
649                        format!("--environment-id={}", environment_id),
650                    ];
651                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
652                        args.push(format!(
653                            "--aws-external-id-prefix={}",
654                            aws_external_id_prefix
655                        ));
656                    }
657                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
658                        args.push(format!(
659                            "--aws-connection-role-arn={}",
660                            aws_connection_role_arn
661                        ));
662                    }
663                    if let Some(memory_limit) = location.allocation.memory_limit {
664                        args.push(format!(
665                            "--announce-memory-limit={}",
666                            memory_limit.0.as_u64()
667                        ));
668                    }
669                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
670                        args.push("--worker-core-affinity".into());
671                    }
672                    if location.allocation.is_cc {
673                        args.push("--is-cc".into());
674                    }
675
676                    args.extend(secrets_args.clone());
677
678                    args
679                }),
680                ports: vec![
681                    ServicePort {
682                        name: "storagectl".into(),
683                        port_hint: 2100,
684                    },
685                    // To simplify the changes to tests, the port
686                    // chosen here is _after_ the compute ones.
687                    // TODO(petrosagg): fix the numerical ordering here
688                    ServicePort {
689                        name: "storage".into(),
690                        port_hint: 2103,
691                    },
692                    ServicePort {
693                        name: "computectl".into(),
694                        port_hint: 2101,
695                    },
696                    ServicePort {
697                        name: "compute".into(),
698                        port_hint: 2102,
699                    },
700                    ServicePort {
701                        name: "internal-http".into(),
702                        port_hint: 6878,
703                    },
704                ],
705                cpu_limit: location.allocation.cpu_limit,
706                memory_limit: location.allocation.memory_limit,
707                scale: location.allocation.scale,
708                labels: BTreeMap::from([
709                    ("replica-id".into(), replica_id.to_string()),
710                    ("cluster-id".into(), cluster_id.to_string()),
711                    ("type".into(), "cluster".into()),
712                    ("replica-role".into(), role_label.into()),
713                    ("workers".into(), location.allocation.workers.to_string()),
714                    ("size".into(), location.size.to_string()),
715                ]),
716                availability_zones: match location.availability_zones {
717                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
718                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
719                },
720                // This provides the orchestrator with some label selectors that
721                // are used to constraint the scheduling of replicas, based on
722                // its internal configuration.
723                other_replicas_selector: vec![
724                    LabelSelector {
725                        label_name: "cluster-id".to_string(),
726                        logic: LabelSelectionLogic::Eq {
727                            value: cluster_id.to_string(),
728                        },
729                    },
730                    // Select other replicas (but not oneself)
731                    LabelSelector {
732                        label_name: "replica-id".into(),
733                        logic: LabelSelectionLogic::NotEq {
734                            value: replica_id.to_string(),
735                        },
736                    },
737                ],
738                replicas_selector: vec![LabelSelector {
739                    label_name: "cluster-id".to_string(),
740                    // Select ALL replicas.
741                    logic: LabelSelectionLogic::Eq {
742                        value: cluster_id.to_string(),
743                    },
744                }],
745                disk_limit: location.allocation.disk_limit,
746                disk: location.disk,
747                node_selector: location.allocation.selectors,
748            },
749        )?;
750
751        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
752            let tx = self.metrics_tx.clone();
753            let orchestrator = Arc::clone(&self.orchestrator);
754            let service_name = service_name.clone();
755            async move {
756                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
757
758                // TODO[btv] -- I tried implementing a `watch_metrics` function,
759                // similar to `watch_services`, but it crashed due to
760                // https://github.com/kube-rs/kube/issues/1092 .
761                //
762                // If `metrics-server` can be made to fill in `resourceVersion`,
763                // or if that bug is fixed, we can try that again rather than using this inelegant
764                // loop.
765                let mut interval = tokio::time::interval(METRICS_INTERVAL);
766                loop {
767                    interval.tick().await;
768                    match orchestrator.fetch_service_metrics(&service_name).await {
769                        Ok(metrics) => {
770                            let _ = tx.send((replica_id, metrics));
771                        }
772                        Err(e) => {
773                            warn!("failed to get metrics for replica {replica_id}: {e}");
774                        }
775                    }
776                }
777            }
778        });
779
780        Ok((service, metrics_task.abort_on_drop()))
781    }
782
783    /// Deprovisions a replica with the service orchestrator.
784    fn deprovision_replica(
785        &self,
786        cluster_id: ClusterId,
787        replica_id: ReplicaId,
788        generation: u64,
789    ) -> Result<(), anyhow::Error> {
790        let service_name = ReplicaServiceName {
791            cluster_id,
792            replica_id,
793            generation,
794        }
795        .to_string();
796        self.orchestrator.drop_service(&service_name)
797    }
798}
799
800/// Remove all replicas from past generations.
801async fn try_remove_past_generation_replicas(
802    orchestrator: &dyn NamespacedOrchestrator,
803    deploy_generation: u64,
804) -> Result<(), anyhow::Error> {
805    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
806
807    for service in services {
808        let name: ReplicaServiceName = service.parse()?;
809        if name.generation < deploy_generation {
810            info!(
811                cluster_id = %name.cluster_id,
812                replica_id = %name.replica_id,
813                "removing past generation replica",
814            );
815            orchestrator.drop_service(&service)?;
816        }
817    }
818
819    Ok(())
820}
821
822/// Represents the name of a cluster replica service in the orchestrator.
823#[derive(PartialEq, Eq, PartialOrd, Ord)]
824pub struct ReplicaServiceName {
825    pub cluster_id: ClusterId,
826    pub replica_id: ReplicaId,
827    pub generation: u64,
828}
829
830impl fmt::Display for ReplicaServiceName {
831    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
832        let ReplicaServiceName {
833            cluster_id,
834            replica_id,
835            generation,
836        } = self;
837        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
838    }
839}
840
841impl FromStr for ReplicaServiceName {
842    type Err = anyhow::Error;
843
844    fn from_str(s: &str) -> Result<Self, Self::Err> {
845        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
846            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
847        });
848
849        let caps = SERVICE_NAME_RE
850            .captures(s)
851            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
852
853        Ok(ReplicaServiceName {
854            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
855            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
856            // Old versions of Materialize did not include generations in
857            // replica service names. Synthesize generation 0 if absent.
858            // TODO: remove this in the next version of Materialize.
859            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
860        })
861    }
862}