mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytesize::ByteSize;
21use chrono::{DateTime, Utc};
22use futures::stream::{BoxStream, StreamExt};
23use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
24use mz_compute_client::controller::ComputeControllerTimestamp;
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35    ServiceEvent, ServicePort,
36};
37use mz_ore::halt;
38use mz_ore::instrument;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49/// Configures a cluster.
50pub struct ClusterConfig {
51    /// The logging variants to enable on the compute instance.
52    ///
53    /// Each logging variant is mapped to the identifier under which to register
54    /// the arrangement storing the log's data.
55    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56    /// An optional arbitrary string that describes the class of the workload
57    /// this cluster is running (e.g., `production` or `staging`).
58    pub workload_class: Option<String>,
59}
60
61/// The status of a cluster.
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64/// Configures a cluster replica.
65#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67    /// The location of the replica.
68    pub location: ReplicaLocation,
69    /// Configuration for the compute half of the replica.
70    pub compute: ComputeReplicaConfig,
71}
72
73/// Configures the resource allocation for a cluster replica.
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76    /// The memory limit for each process in the replica.
77    pub memory_limit: Option<MemoryLimit>,
78    /// The CPU limit for each process in the replica.
79    pub cpu_limit: Option<CpuLimit>,
80    /// The disk limit for each process in the replica.
81    pub disk_limit: Option<DiskLimit>,
82    /// The number of processes in the replica.
83    pub scale: u16,
84    /// The number of worker threads in the replica.
85    pub workers: usize,
86    /// The number of credits per hour that the replica consumes.
87    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
88    pub credits_per_hour: Numeric,
89    /// Whether each process has exclusive access to its CPU cores.
90    #[serde(default)]
91    pub cpu_exclusive: bool,
92    /// Whether this size represents a modern "cc" size rather than a legacy
93    /// T-shirt size.
94    #[serde(default = "default_true")]
95    pub is_cc: bool,
96    /// Whether instances of this type use swap as the spill-to-disk mechanism.
97    #[serde(default)]
98    pub swap_enabled: bool,
99    /// Whether instances of this type can be created.
100    #[serde(default)]
101    pub disabled: bool,
102    /// Additional node selectors.
103    #[serde(default)]
104    pub selectors: BTreeMap<String, String>,
105}
106
107fn default_true() -> bool {
108    true
109}
110
111#[mz_ore::test]
112// We test this particularly because we deserialize values from strings.
113#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
114fn test_replica_allocation_deserialization() {
115    use bytesize::ByteSize;
116
117    let data = r#"
118        {
119            "cpu_limit": 1.0,
120            "memory_limit": "10GiB",
121            "disk_limit": "100MiB",
122            "scale": 16,
123            "workers": 1,
124            "credits_per_hour": "16",
125            "swap_enabled": true,
126            "selectors": {
127                "key1": "value1",
128                "key2": "value2"
129            }
130        }"#;
131
132    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
133        .expect("deserialization from JSON succeeds for ReplicaAllocation");
134
135    assert_eq!(
136        replica_allocation,
137        ReplicaAllocation {
138            credits_per_hour: 16.into(),
139            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
140            disabled: false,
141            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
142            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
143            cpu_exclusive: false,
144            is_cc: true,
145            swap_enabled: true,
146            scale: 16,
147            workers: 1,
148            selectors: BTreeMap::from([
149                ("key1".to_string(), "value1".to_string()),
150                ("key2".to_string(), "value2".to_string())
151            ]),
152        }
153    );
154
155    let data = r#"
156        {
157            "cpu_limit": 0,
158            "memory_limit": "0GiB",
159            "disk_limit": "0MiB",
160            "scale": 0,
161            "workers": 0,
162            "credits_per_hour": "0",
163            "cpu_exclusive": true,
164            "disabled": true
165        }"#;
166
167    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
168        .expect("deserialization from JSON succeeds for ReplicaAllocation");
169
170    assert_eq!(
171        replica_allocation,
172        ReplicaAllocation {
173            credits_per_hour: 0.into(),
174            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
175            disabled: true,
176            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
177            cpu_limit: Some(CpuLimit::from_millicpus(0)),
178            cpu_exclusive: true,
179            is_cc: true,
180            swap_enabled: false,
181            scale: 0,
182            workers: 0,
183            selectors: Default::default(),
184        }
185    );
186}
187
188/// Configures the location of a cluster replica.
189#[derive(Clone, Debug, Serialize, PartialEq)]
190pub enum ReplicaLocation {
191    /// An unmanaged replica.
192    Unmanaged(UnmanagedReplicaLocation),
193    /// A managed replica.
194    Managed(ManagedReplicaLocation),
195}
196
197impl ReplicaLocation {
198    /// Returns the number of processes specified by this replica location.
199    pub fn num_processes(&self) -> usize {
200        match self {
201            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
202                computectl_addrs, ..
203            }) => computectl_addrs.len(),
204            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
205                allocation.scale.into()
206            }
207        }
208    }
209
210    pub fn billed_as(&self) -> Option<&str> {
211        match self {
212            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
213                billed_as.as_deref()
214            }
215            _ => None,
216        }
217    }
218
219    pub fn internal(&self) -> bool {
220        match self {
221            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
222            ReplicaLocation::Unmanaged(_) => false,
223        }
224    }
225
226    /// Returns the number of workers specified by this replica location.
227    ///
228    /// `None` for unmanaged replicas, whose worker count we don't know.
229    pub fn workers(&self) -> Option<usize> {
230        match self {
231            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
232                Some(allocation.workers * self.num_processes())
233            }
234            ReplicaLocation::Unmanaged(_) => None,
235        }
236    }
237
238    /// A pending replica is created as part of an alter cluster of an managed
239    /// cluster. the configuration of a pending replica will not match that of
240    /// the clusters until the alter has been finalized promoting the pending
241    /// replicas and setting this value to false.
242    pub fn pending(&self) -> bool {
243        match self {
244            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
245            _ => false,
246        }
247    }
248}
249
250/// The "role" of a cluster, which is currently used to determine the
251/// severity of alerts for problems with its replicas.
252pub enum ClusterRole {
253    /// The existence and proper functioning of the cluster's replicas is
254    /// business-critical for Materialize.
255    SystemCritical,
256    /// Assuming no bugs, the cluster's replicas should always exist and function
257    /// properly. If it doesn't, however, that is less urgent than
258    /// would be the case for a `SystemCritical` replica.
259    System,
260    /// The cluster is controlled by the user, and might go down for
261    /// reasons outside our control (e.g., OOMs).
262    User,
263}
264
265/// The location of an unmanaged replica.
266#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
267pub struct UnmanagedReplicaLocation {
268    /// The network addresses of the storagectl endpoints for each process in
269    /// the replica.
270    pub storagectl_addrs: Vec<String>,
271    /// The network addresses of the computectl endpoints for each process in
272    /// the replica.
273    pub computectl_addrs: Vec<String>,
274}
275
276/// Information about availability zone constraints for replicas.
277#[derive(Clone, Debug, PartialEq, Eq)]
278pub enum ManagedReplicaAvailabilityZones {
279    /// Specified if the `Replica` is from `MANAGED` cluster,
280    /// and specifies if there is an `AVAILABILITY ZONES`
281    /// constraint. Empty lists are represented as `None`.
282    FromCluster(Option<Vec<String>>),
283    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
284    /// and specifies if there is a specific `AVAILABILITY ZONE`.
285    FromReplica(Option<String>),
286}
287
288/// The location of a managed replica.
289#[derive(Clone, Debug, Serialize, PartialEq)]
290pub struct ManagedReplicaLocation {
291    /// The resource allocation for the replica.
292    pub allocation: ReplicaAllocation,
293    /// SQL size parameter used for allocation
294    pub size: String,
295    /// If `true`, Materialize support owns this replica.
296    pub internal: bool,
297    /// Optional SQL size parameter used for billing.
298    pub billed_as: Option<String>,
299    /// The replica's availability zones, if specified.
300    ///
301    /// This is either the replica's specific `AVAILABILITY ZONE`,
302    /// or the zones placed here during replica concretization
303    /// from the `MANAGED` cluster config.
304    ///
305    /// We skip serialization (which is used for some validation
306    /// in tests) as the latter case is a "virtual" piece of information,
307    /// that exists only at runtime.
308    ///
309    /// An empty list of availability zones is concretized as `None`,
310    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
311    /// is an empty list if none are specified
312    #[serde(skip)]
313    pub availability_zones: ManagedReplicaAvailabilityZones,
314    /// Whether the replica is pending reconfiguration
315    pub pending: bool,
316}
317
318impl ManagedReplicaLocation {
319    /// Return the size which should be used to determine billing-related information.
320    pub fn size_for_billing(&self) -> &str {
321        self.billed_as.as_deref().unwrap_or(&self.size)
322    }
323}
324
325/// Configures logging for a cluster replica.
326pub type ReplicaLogging = ComputeReplicaLogging;
327
328/// Identifier of a process within a replica.
329pub type ProcessId = u64;
330
331/// An event describing a change in status of a cluster replica process.
332#[derive(Debug, Clone, Serialize)]
333pub struct ClusterEvent {
334    pub cluster_id: ClusterId,
335    pub replica_id: ReplicaId,
336    pub process_id: ProcessId,
337    pub status: ClusterStatus,
338    pub time: DateTime<Utc>,
339}
340
341impl<T> Controller<T>
342where
343    T: ComputeControllerTimestamp,
344{
345    /// Creates a cluster with the specified identifier and configuration.
346    ///
347    /// A cluster is a combination of a storage instance and a compute instance.
348    /// A cluster has zero or more replicas; each replica colocates the storage
349    /// and compute layers on the same physical resources.
350    pub fn create_cluster(
351        &mut self,
352        id: ClusterId,
353        config: ClusterConfig,
354    ) -> Result<(), anyhow::Error> {
355        self.storage
356            .create_instance(id, config.workload_class.clone());
357        self.compute
358            .create_instance(id, config.arranged_logs, config.workload_class)?;
359        Ok(())
360    }
361
362    /// Updates the workload class for a cluster.
363    pub fn update_cluster_workload_class(
364        &mut self,
365        id: ClusterId,
366        workload_class: Option<String>,
367    ) -> Result<(), anyhow::Error> {
368        self.storage
369            .update_instance_workload_class(id, workload_class.clone());
370        self.compute
371            .update_instance_workload_class(id, workload_class)?;
372        Ok(())
373    }
374
375    /// Drops the specified cluster.
376    ///
377    /// # Panics
378    ///
379    /// Panics if the cluster still has replicas.
380    pub fn drop_cluster(&mut self, id: ClusterId) {
381        self.storage.drop_instance(id);
382        self.compute.drop_instance(id);
383    }
384
385    /// Creates a replica of the specified cluster with the specified identifier
386    /// and configuration.
387    pub fn create_replica(
388        &mut self,
389        cluster_id: ClusterId,
390        replica_id: ReplicaId,
391        cluster_name: String,
392        replica_name: String,
393        role: ClusterRole,
394        config: ReplicaConfig,
395        enable_worker_core_affinity: bool,
396    ) -> Result<(), anyhow::Error> {
397        let storage_location: ClusterReplicaLocation;
398        let compute_location: ClusterReplicaLocation;
399        let metrics_task: Option<AbortOnDropHandle<()>>;
400
401        match config.location {
402            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
403                storagectl_addrs,
404                computectl_addrs,
405            }) => {
406                compute_location = ClusterReplicaLocation {
407                    ctl_addrs: computectl_addrs,
408                };
409                storage_location = ClusterReplicaLocation {
410                    ctl_addrs: storagectl_addrs,
411                };
412                metrics_task = None;
413            }
414            ReplicaLocation::Managed(m) => {
415                let (service, metrics_task_join_handle) = self.provision_replica(
416                    cluster_id,
417                    replica_id,
418                    cluster_name,
419                    replica_name,
420                    role,
421                    m,
422                    enable_worker_core_affinity,
423                )?;
424                storage_location = ClusterReplicaLocation {
425                    ctl_addrs: service.addresses("storagectl"),
426                };
427                compute_location = ClusterReplicaLocation {
428                    ctl_addrs: service.addresses("computectl"),
429                };
430                metrics_task = Some(metrics_task_join_handle);
431            }
432        }
433
434        self.storage
435            .connect_replica(cluster_id, replica_id, storage_location);
436        self.compute.add_replica_to_instance(
437            cluster_id,
438            replica_id,
439            compute_location,
440            config.compute,
441        )?;
442
443        if let Some(task) = metrics_task {
444            self.metrics_tasks.insert(replica_id, task);
445        }
446
447        Ok(())
448    }
449
450    /// Drops the specified replica of the specified cluster.
451    pub fn drop_replica(
452        &mut self,
453        cluster_id: ClusterId,
454        replica_id: ReplicaId,
455    ) -> Result<(), anyhow::Error> {
456        // We unconditionally deprovision even for unmanaged replicas to avoid
457        // needing to keep track of which replicas are managed and which are
458        // unmanaged. Deprovisioning is a no-op if the replica ID was never
459        // provisioned.
460        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
461        self.metrics_tasks.remove(&replica_id);
462
463        self.compute.drop_replica(cluster_id, replica_id)?;
464        self.storage.drop_replica(cluster_id, replica_id);
465        Ok(())
466    }
467
468    /// Removes replicas from past generations in a background task.
469    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
470        let deploy_generation = self.deploy_generation;
471        let dyncfg = Arc::clone(self.compute.dyncfg());
472        let orchestrator = Arc::clone(&self.orchestrator);
473        task::spawn(
474            || "controller_remove_past_generation_replicas",
475            async move {
476                info!("attempting to remove past generation replicas");
477                loop {
478                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
479                        .await
480                    {
481                        Ok(()) => {
482                            info!("successfully removed past generation replicas");
483                            return;
484                        }
485                        Err(e) => {
486                            let interval =
487                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
488                                    .get(&dyncfg);
489                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
490                            time::sleep(interval).await;
491                        }
492                    }
493                }
494            },
495        );
496    }
497
498    /// Remove replicas that are orphaned in the current generation.
499    #[instrument]
500    pub async fn remove_orphaned_replicas(
501        &mut self,
502        next_user_replica_id: u64,
503        next_system_replica_id: u64,
504    ) -> Result<(), anyhow::Error> {
505        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
506
507        let actual: BTreeSet<_> = self
508            .orchestrator
509            .list_services()
510            .await?
511            .iter()
512            .map(|s| ReplicaServiceName::from_str(s))
513            .collect::<Result<_, _>>()?;
514
515        for ReplicaServiceName {
516            cluster_id,
517            replica_id,
518            generation,
519        } in actual
520        {
521            // We limit our attention here to replicas from the current deploy
522            // generation. Replicas from past generations are cleaned up during
523            // `Controller::allow_writes`.
524            if generation != self.deploy_generation {
525                continue;
526            }
527
528            let smaller_next = match replica_id {
529                ReplicaId::User(id) if id >= next_user_replica_id => {
530                    Some(ReplicaId::User(next_user_replica_id))
531                }
532                ReplicaId::System(id) if id >= next_system_replica_id => {
533                    Some(ReplicaId::System(next_system_replica_id))
534                }
535                _ => None,
536            };
537            if let Some(next) = smaller_next {
538                // Found a replica in the orchestrator with a higher replica ID
539                // than what we are aware of. This must have been created by an
540                // environmentd that's competing for control of this generation.
541                // Abort to let the other process have full control.
542                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
543            }
544            if !desired.contains(&replica_id) {
545                self.deprovision_replica(cluster_id, replica_id, generation)?;
546            }
547        }
548
549        Ok(())
550    }
551
552    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
553        let deploy_generation = self.deploy_generation;
554
555        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
556            let ReplicaServiceName {
557                cluster_id,
558                replica_id,
559                generation: replica_generation,
560                ..
561            } = event.service_id.parse()?;
562
563            let event = ClusterEvent {
564                cluster_id,
565                replica_id,
566                process_id: event.process_id,
567                status: event.status,
568                time: event.time,
569            };
570
571            Ok((event, replica_generation))
572        }
573
574        let stream = self
575            .orchestrator
576            .watch_services()
577            .map(|event| event.and_then(translate_event))
578            .filter_map(move |event| async move {
579                match event {
580                    Ok((event, replica_generation)) => {
581                        if replica_generation == deploy_generation {
582                            Some(event)
583                        } else {
584                            None
585                        }
586                    }
587                    Err(error) => {
588                        error!("service watch error: {error}");
589                        None
590                    }
591                }
592            });
593
594        Box::pin(stream)
595    }
596
597    /// Provisions a replica with the service orchestrator.
598    fn provision_replica(
599        &self,
600        cluster_id: ClusterId,
601        replica_id: ReplicaId,
602        cluster_name: String,
603        replica_name: String,
604        role: ClusterRole,
605        location: ManagedReplicaLocation,
606        enable_worker_core_affinity: bool,
607    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
608        let service_name = ReplicaServiceName {
609            cluster_id,
610            replica_id,
611            generation: self.deploy_generation,
612        }
613        .to_string();
614        let role_label = match role {
615            ClusterRole::SystemCritical => "system-critical",
616            ClusterRole::System => "system",
617            ClusterRole::User => "user",
618        };
619        let environment_id = self.connection_context().environment_id.clone();
620        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
621        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
622        let persist_pubsub_url = self.persist_pubsub_url.clone();
623        let secrets_args = self.secrets_args.to_flags();
624
625        // TODO(teskje): use the same values as for compute?
626        let storage_proto_timely_config = TimelyConfig {
627            arrangement_exert_proportionality: 1337,
628            ..Default::default()
629        };
630        let compute_proto_timely_config = TimelyConfig {
631            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
632            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
633            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
634            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
635            ..Default::default()
636        };
637
638        let mut disk_limit = location.allocation.disk_limit;
639        let memory_limit = location.allocation.memory_limit;
640        let mut memory_request = None;
641
642        if location.allocation.swap_enabled {
643            // The disk limit we specify in the service config decides whether or not the replica
644            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
645            // make sure to set the disk limit accordingly.
646            disk_limit = Some(DiskLimit::ZERO);
647
648            // We want to keep the memory request equal to the memory limit, to avoid
649            // over-provisioning and ensure replicas have predictable performance. However, to
650            // enable swap, Kubernetes currently requires that request and limit are different.
651            memory_request = memory_limit.map(|MemoryLimit(limit)| {
652                let request = ByteSize::b(limit.as_u64() - 1);
653                MemoryLimit(request)
654            });
655        }
656
657        let service = self.orchestrator.ensure_service(
658            &service_name,
659            ServiceConfig {
660                image: self.clusterd_image.clone(),
661                init_container_image: self.init_container_image.clone(),
662                args: Box::new(move |assigned| {
663                    let storage_timely_config = TimelyConfig {
664                        workers: location.allocation.workers,
665                        addresses: assigned.peer_addresses("storage"),
666                        ..storage_proto_timely_config
667                    };
668                    let compute_timely_config = TimelyConfig {
669                        workers: location.allocation.workers,
670                        addresses: assigned.peer_addresses("compute"),
671                        ..compute_proto_timely_config
672                    };
673
674                    let mut args = vec![
675                        format!(
676                            "--storage-controller-listen-addr={}",
677                            assigned.listen_addrs["storagectl"]
678                        ),
679                        format!(
680                            "--compute-controller-listen-addr={}",
681                            assigned.listen_addrs["computectl"]
682                        ),
683                        format!(
684                            "--internal-http-listen-addr={}",
685                            assigned.listen_addrs["internal-http"]
686                        ),
687                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
688                        format!("--opentelemetry-resource=replica_id={}", replica_id),
689                        format!("--persist-pubsub-url={}", persist_pubsub_url),
690                        format!("--environment-id={}", environment_id),
691                        format!(
692                            "--storage-timely-config={}",
693                            storage_timely_config.to_string(),
694                        ),
695                        format!(
696                            "--compute-timely-config={}",
697                            compute_timely_config.to_string(),
698                        ),
699                    ];
700                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
701                        args.push(format!(
702                            "--aws-external-id-prefix={}",
703                            aws_external_id_prefix
704                        ));
705                    }
706                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
707                        args.push(format!(
708                            "--aws-connection-role-arn={}",
709                            aws_connection_role_arn
710                        ));
711                    }
712                    if let Some(memory_limit) = location.allocation.memory_limit {
713                        args.push(format!(
714                            "--announce-memory-limit={}",
715                            memory_limit.0.as_u64()
716                        ));
717                    }
718                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
719                        args.push("--worker-core-affinity".into());
720                    }
721                    if location.allocation.is_cc {
722                        args.push("--is-cc".into());
723                    }
724
725                    // If swap is enabled, make the replica limit its own heap usage based on the
726                    // configured memory and disk limits.
727                    if location.allocation.swap_enabled
728                        && let Some(memory_limit) = location.allocation.memory_limit
729                        && let Some(disk_limit) = location.allocation.disk_limit
730                        // Currently, the way for replica sizes to request unlimited swap is to
731                        // specify a `disk_limit` of 0. Ideally we'd change this to make them
732                        // specify no disk limit instead, but for now we need to special-case here.
733                        && disk_limit != DiskLimit::ZERO
734                    {
735                        let heap_limit = memory_limit.0 + disk_limit.0;
736                        args.push(format!("--heap-limit={}", heap_limit.as_u64()));
737                    }
738
739                    args.extend(secrets_args.clone());
740                    args
741                }),
742                ports: vec![
743                    ServicePort {
744                        name: "storagectl".into(),
745                        port_hint: 2100,
746                    },
747                    // To simplify the changes to tests, the port
748                    // chosen here is _after_ the compute ones.
749                    // TODO(petrosagg): fix the numerical ordering here
750                    ServicePort {
751                        name: "storage".into(),
752                        port_hint: 2103,
753                    },
754                    ServicePort {
755                        name: "computectl".into(),
756                        port_hint: 2101,
757                    },
758                    ServicePort {
759                        name: "compute".into(),
760                        port_hint: 2102,
761                    },
762                    ServicePort {
763                        name: "internal-http".into(),
764                        port_hint: 6878,
765                    },
766                ],
767                cpu_limit: location.allocation.cpu_limit,
768                memory_limit,
769                memory_request,
770                scale: location.allocation.scale,
771                labels: BTreeMap::from([
772                    ("replica-id".into(), replica_id.to_string()),
773                    ("cluster-id".into(), cluster_id.to_string()),
774                    ("type".into(), "cluster".into()),
775                    ("replica-role".into(), role_label.into()),
776                    ("workers".into(), location.allocation.workers.to_string()),
777                    (
778                        "size".into(),
779                        location
780                            .size
781                            .to_string()
782                            .replace("=", "-")
783                            .replace(",", "_"),
784                    ),
785                ]),
786                annotations: BTreeMap::from([
787                    ("replica-name".into(), replica_name),
788                    ("cluster-name".into(), cluster_name),
789                ]),
790                availability_zones: match location.availability_zones {
791                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
792                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
793                },
794                // This provides the orchestrator with some label selectors that
795                // are used to constraint the scheduling of replicas, based on
796                // its internal configuration.
797                other_replicas_selector: vec![
798                    LabelSelector {
799                        label_name: "cluster-id".to_string(),
800                        logic: LabelSelectionLogic::Eq {
801                            value: cluster_id.to_string(),
802                        },
803                    },
804                    // Select other replicas (but not oneself)
805                    LabelSelector {
806                        label_name: "replica-id".into(),
807                        logic: LabelSelectionLogic::NotEq {
808                            value: replica_id.to_string(),
809                        },
810                    },
811                ],
812                replicas_selector: vec![LabelSelector {
813                    label_name: "cluster-id".to_string(),
814                    // Select ALL replicas.
815                    logic: LabelSelectionLogic::Eq {
816                        value: cluster_id.to_string(),
817                    },
818                }],
819                disk_limit,
820                node_selector: location.allocation.selectors,
821            },
822        )?;
823
824        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
825            let tx = self.metrics_tx.clone();
826            let orchestrator = Arc::clone(&self.orchestrator);
827            let service_name = service_name.clone();
828            async move {
829                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
830
831                // TODO[btv] -- I tried implementing a `watch_metrics` function,
832                // similar to `watch_services`, but it crashed due to
833                // https://github.com/kube-rs/kube/issues/1092 .
834                //
835                // If `metrics-server` can be made to fill in `resourceVersion`,
836                // or if that bug is fixed, we can try that again rather than using this inelegant
837                // loop.
838                let mut interval = tokio::time::interval(METRICS_INTERVAL);
839                loop {
840                    interval.tick().await;
841                    match orchestrator.fetch_service_metrics(&service_name).await {
842                        Ok(metrics) => {
843                            let _ = tx.send((replica_id, metrics));
844                        }
845                        Err(e) => {
846                            warn!("failed to get metrics for replica {replica_id}: {e}");
847                        }
848                    }
849                }
850            }
851        });
852
853        Ok((service, metrics_task.abort_on_drop()))
854    }
855
856    /// Deprovisions a replica with the service orchestrator.
857    fn deprovision_replica(
858        &self,
859        cluster_id: ClusterId,
860        replica_id: ReplicaId,
861        generation: u64,
862    ) -> Result<(), anyhow::Error> {
863        let service_name = ReplicaServiceName {
864            cluster_id,
865            replica_id,
866            generation,
867        }
868        .to_string();
869        self.orchestrator.drop_service(&service_name)
870    }
871}
872
873/// Remove all replicas from past generations.
874async fn try_remove_past_generation_replicas(
875    orchestrator: &dyn NamespacedOrchestrator,
876    deploy_generation: u64,
877) -> Result<(), anyhow::Error> {
878    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
879
880    for service in services {
881        let name: ReplicaServiceName = service.parse()?;
882        if name.generation < deploy_generation {
883            info!(
884                cluster_id = %name.cluster_id,
885                replica_id = %name.replica_id,
886                "removing past generation replica",
887            );
888            orchestrator.drop_service(&service)?;
889        }
890    }
891
892    Ok(())
893}
894
895/// Represents the name of a cluster replica service in the orchestrator.
896#[derive(PartialEq, Eq, PartialOrd, Ord)]
897pub struct ReplicaServiceName {
898    pub cluster_id: ClusterId,
899    pub replica_id: ReplicaId,
900    pub generation: u64,
901}
902
903impl fmt::Display for ReplicaServiceName {
904    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
905        let ReplicaServiceName {
906            cluster_id,
907            replica_id,
908            generation,
909        } = self;
910        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
911    }
912}
913
914impl FromStr for ReplicaServiceName {
915    type Err = anyhow::Error;
916
917    fn from_str(s: &str) -> Result<Self, Self::Err> {
918        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
919            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
920        });
921
922        let caps = SERVICE_NAME_RE
923            .captures(s)
924            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
925
926        Ok(ReplicaServiceName {
927            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
928            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
929            // Old versions of Materialize did not include generations in
930            // replica service names. Synthesize generation 0 if absent.
931            // TODO: remove this in the next version of Materialize.
932            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
933        })
934    }
935}