mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::sync::LazyLock;
17use std::time::Duration;
18
19use anyhow::anyhow;
20use bytesize::ByteSize;
21use chrono::{DateTime, Utc};
22use futures::stream::{BoxStream, StreamExt};
23use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
24use mz_compute_client::controller::ComputeControllerTimestamp;
25use mz_compute_client::logging::LogVariant;
26use mz_compute_client::service::{ComputeClient, ComputeGrpcClient};
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30    ENABLE_CTP_CLUSTER_PROTOCOLS, ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC,
31    TIMELY_ZERO_COPY_LIMIT,
32};
33use mz_controller_types::{ClusterId, ReplicaId};
34use mz_orchestrator::NamespacedOrchestrator;
35use mz_orchestrator::{
36    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
37    ServiceEvent, ServicePort,
38};
39use mz_ore::halt;
40use mz_ore::instrument;
41use mz_ore::task::{self, AbortOnDropHandle};
42use mz_repr::GlobalId;
43use mz_repr::adt::numeric::Numeric;
44use regex::Regex;
45use serde::{Deserialize, Serialize};
46use tokio::time;
47use tracing::{error, info, warn};
48
49use crate::Controller;
50
51/// Configures a cluster.
52pub struct ClusterConfig {
53    /// The logging variants to enable on the compute instance.
54    ///
55    /// Each logging variant is mapped to the identifier under which to register
56    /// the arrangement storing the log's data.
57    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
58    /// An optional arbitrary string that describes the class of the workload
59    /// this cluster is running (e.g., `production` or `staging`).
60    pub workload_class: Option<String>,
61}
62
63/// The status of a cluster.
64pub type ClusterStatus = mz_orchestrator::ServiceStatus;
65
66/// Configures a cluster replica.
67#[derive(Clone, Debug, Serialize, PartialEq)]
68pub struct ReplicaConfig {
69    /// The location of the replica.
70    pub location: ReplicaLocation,
71    /// Configuration for the compute half of the replica.
72    pub compute: ComputeReplicaConfig,
73}
74
75/// Configures the resource allocation for a cluster replica.
76#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ReplicaAllocation {
78    /// The memory limit for each process in the replica.
79    pub memory_limit: Option<MemoryLimit>,
80    /// The CPU limit for each process in the replica.
81    pub cpu_limit: Option<CpuLimit>,
82    /// The disk limit for each process in the replica.
83    pub disk_limit: Option<DiskLimit>,
84    /// The number of processes in the replica.
85    pub scale: u16,
86    /// The number of worker threads in the replica.
87    pub workers: usize,
88    /// The number of credits per hour that the replica consumes.
89    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90    pub credits_per_hour: Numeric,
91    /// Whether each process has exclusive access to its CPU cores.
92    #[serde(default)]
93    pub cpu_exclusive: bool,
94    /// Whether this size represents a modern "cc" size rather than a legacy
95    /// T-shirt size.
96    #[serde(default = "default_true")]
97    pub is_cc: bool,
98    /// Whether instances of this type use swap as the spill-to-disk mechanism.
99    #[serde(default)]
100    pub swap_enabled: bool,
101    /// Whether instances of this type can be created.
102    #[serde(default)]
103    pub disabled: bool,
104    /// Additional node selectors.
105    #[serde(default)]
106    pub selectors: BTreeMap<String, String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113#[mz_ore::test]
114// We test this particularly because we deserialize values from strings.
115#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
116fn test_replica_allocation_deserialization() {
117    use bytesize::ByteSize;
118
119    let data = r#"
120        {
121            "cpu_limit": 1.0,
122            "memory_limit": "10GiB",
123            "disk_limit": "100MiB",
124            "scale": 16,
125            "workers": 1,
126            "credits_per_hour": "16",
127            "swap_enabled": true,
128            "selectors": {
129                "key1": "value1",
130                "key2": "value2"
131            }
132        }"#;
133
134    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
135        .expect("deserialization from JSON succeeds for ReplicaAllocation");
136
137    assert_eq!(
138        replica_allocation,
139        ReplicaAllocation {
140            credits_per_hour: 16.into(),
141            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
142            disabled: false,
143            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
144            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
145            cpu_exclusive: false,
146            is_cc: true,
147            swap_enabled: true,
148            scale: 16,
149            workers: 1,
150            selectors: BTreeMap::from([
151                ("key1".to_string(), "value1".to_string()),
152                ("key2".to_string(), "value2".to_string())
153            ]),
154        }
155    );
156
157    let data = r#"
158        {
159            "cpu_limit": 0,
160            "memory_limit": "0GiB",
161            "disk_limit": "0MiB",
162            "scale": 0,
163            "workers": 0,
164            "credits_per_hour": "0",
165            "cpu_exclusive": true,
166            "disabled": true
167        }"#;
168
169    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
170        .expect("deserialization from JSON succeeds for ReplicaAllocation");
171
172    assert_eq!(
173        replica_allocation,
174        ReplicaAllocation {
175            credits_per_hour: 0.into(),
176            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
177            disabled: true,
178            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
179            cpu_limit: Some(CpuLimit::from_millicpus(0)),
180            cpu_exclusive: true,
181            is_cc: true,
182            swap_enabled: false,
183            scale: 0,
184            workers: 0,
185            selectors: Default::default(),
186        }
187    );
188}
189
190/// Configures the location of a cluster replica.
191#[derive(Clone, Debug, Serialize, PartialEq)]
192pub enum ReplicaLocation {
193    /// An unmanaged replica.
194    Unmanaged(UnmanagedReplicaLocation),
195    /// A managed replica.
196    Managed(ManagedReplicaLocation),
197}
198
199impl ReplicaLocation {
200    /// Returns the number of processes specified by this replica location.
201    pub fn num_processes(&self) -> usize {
202        match self {
203            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
204                computectl_addrs, ..
205            }) => computectl_addrs.len(),
206            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
207                allocation.scale.into()
208            }
209        }
210    }
211
212    pub fn billed_as(&self) -> Option<&str> {
213        match self {
214            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
215                billed_as.as_deref()
216            }
217            _ => None,
218        }
219    }
220
221    pub fn internal(&self) -> bool {
222        match self {
223            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
224            ReplicaLocation::Unmanaged(_) => false,
225        }
226    }
227
228    /// Returns the number of workers specified by this replica location.
229    ///
230    /// `None` for unmanaged replicas, whose worker count we don't know.
231    pub fn workers(&self) -> Option<usize> {
232        match self {
233            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
234                Some(allocation.workers * self.num_processes())
235            }
236            ReplicaLocation::Unmanaged(_) => None,
237        }
238    }
239
240    /// A pending replica is created as part of an alter cluster of an managed
241    /// cluster. the configuration of a pending replica will not match that of
242    /// the clusters until the alter has been finalized promoting the pending
243    /// replicas and setting this value to false.
244    pub fn pending(&self) -> bool {
245        match self {
246            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
247            _ => false,
248        }
249    }
250}
251
252/// The "role" of a cluster, which is currently used to determine the
253/// severity of alerts for problems with its replicas.
254pub enum ClusterRole {
255    /// The existence and proper functioning of the cluster's replicas is
256    /// business-critical for Materialize.
257    SystemCritical,
258    /// Assuming no bugs, the cluster's replicas should always exist and function
259    /// properly. If it doesn't, however, that is less urgent than
260    /// would be the case for a `SystemCritical` replica.
261    System,
262    /// The cluster is controlled by the user, and might go down for
263    /// reasons outside our control (e.g., OOMs).
264    User,
265}
266
267/// The location of an unmanaged replica.
268#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
269pub struct UnmanagedReplicaLocation {
270    /// The network addresses of the storagectl endpoints for each process in
271    /// the replica.
272    pub storagectl_addrs: Vec<String>,
273    /// The network addresses of the computectl endpoints for each process in
274    /// the replica.
275    pub computectl_addrs: Vec<String>,
276}
277
278/// Information about availability zone constraints for replicas.
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum ManagedReplicaAvailabilityZones {
281    /// Specified if the `Replica` is from `MANAGED` cluster,
282    /// and specifies if there is an `AVAILABILITY ZONES`
283    /// constraint. Empty lists are represented as `None`.
284    FromCluster(Option<Vec<String>>),
285    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
286    /// and specifies if there is a specific `AVAILABILITY ZONE`.
287    FromReplica(Option<String>),
288}
289
290/// The location of a managed replica.
291#[derive(Clone, Debug, Serialize, PartialEq)]
292pub struct ManagedReplicaLocation {
293    /// The resource allocation for the replica.
294    pub allocation: ReplicaAllocation,
295    /// SQL size parameter used for allocation
296    pub size: String,
297    /// If `true`, Materialize support owns this replica.
298    pub internal: bool,
299    /// Optional SQL size parameter used for billing.
300    pub billed_as: Option<String>,
301    /// The replica's availability zones, if specified.
302    ///
303    /// This is either the replica's specific `AVAILABILITY ZONE`,
304    /// or the zones placed here during replica concretization
305    /// from the `MANAGED` cluster config.
306    ///
307    /// We skip serialization (which is used for some validation
308    /// in tests) as the latter case is a "virtual" piece of information,
309    /// that exists only at runtime.
310    ///
311    /// An empty list of availability zones is concretized as `None`,
312    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
313    /// is an empty list if none are specified
314    #[serde(skip)]
315    pub availability_zones: ManagedReplicaAvailabilityZones,
316    /// Whether the replica is pending reconfiguration
317    pub pending: bool,
318}
319
320impl ManagedReplicaLocation {
321    /// Return the size which should be used to determine billing-related information.
322    pub fn size_for_billing(&self) -> &str {
323        self.billed_as.as_deref().unwrap_or(&self.size)
324    }
325}
326
327/// Configures logging for a cluster replica.
328pub type ReplicaLogging = ComputeReplicaLogging;
329
330/// Identifier of a process within a replica.
331pub type ProcessId = u64;
332
333/// An event describing a change in status of a cluster replica process.
334#[derive(Debug, Clone, Serialize)]
335pub struct ClusterEvent {
336    pub cluster_id: ClusterId,
337    pub replica_id: ReplicaId,
338    pub process_id: ProcessId,
339    pub status: ClusterStatus,
340    pub time: DateTime<Utc>,
341}
342
343impl<T> Controller<T>
344where
345    T: ComputeControllerTimestamp,
346    ComputeGrpcClient: ComputeClient<T>,
347{
348    /// Creates a cluster with the specified identifier and configuration.
349    ///
350    /// A cluster is a combination of a storage instance and a compute instance.
351    /// A cluster has zero or more replicas; each replica colocates the storage
352    /// and compute layers on the same physical resources.
353    pub fn create_cluster(
354        &mut self,
355        id: ClusterId,
356        config: ClusterConfig,
357    ) -> Result<(), anyhow::Error> {
358        self.storage
359            .create_instance(id, config.workload_class.clone());
360        self.compute
361            .create_instance(id, config.arranged_logs, config.workload_class)?;
362        Ok(())
363    }
364
365    /// Updates the workload class for a cluster.
366    pub fn update_cluster_workload_class(
367        &mut self,
368        id: ClusterId,
369        workload_class: Option<String>,
370    ) -> Result<(), anyhow::Error> {
371        self.storage
372            .update_instance_workload_class(id, workload_class.clone());
373        self.compute
374            .update_instance_workload_class(id, workload_class)?;
375        Ok(())
376    }
377
378    /// Drops the specified cluster.
379    ///
380    /// # Panics
381    ///
382    /// Panics if the cluster still has replicas.
383    pub fn drop_cluster(&mut self, id: ClusterId) {
384        self.storage.drop_instance(id);
385        self.compute.drop_instance(id);
386    }
387
388    /// Creates a replica of the specified cluster with the specified identifier
389    /// and configuration.
390    pub fn create_replica(
391        &mut self,
392        cluster_id: ClusterId,
393        replica_id: ReplicaId,
394        cluster_name: String,
395        replica_name: String,
396        role: ClusterRole,
397        config: ReplicaConfig,
398        enable_worker_core_affinity: bool,
399    ) -> Result<(), anyhow::Error> {
400        let storage_location: ClusterReplicaLocation;
401        let compute_location: ClusterReplicaLocation;
402        let metrics_task: Option<AbortOnDropHandle<()>>;
403
404        // For controller-replica communication, we select between CTP or gRPC based on a dyncfg.
405        // We need to be careful to pass the same value to both the orchestrator and the
406        // sub-controllers, to ensure they agree on which protocol to use.
407        let enable_ctp = ENABLE_CTP_CLUSTER_PROTOCOLS.get(&self.dyncfg);
408
409        match config.location {
410            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
411                storagectl_addrs,
412                computectl_addrs,
413            }) => {
414                compute_location = ClusterReplicaLocation {
415                    ctl_addrs: computectl_addrs,
416                };
417                storage_location = ClusterReplicaLocation {
418                    ctl_addrs: storagectl_addrs,
419                };
420                metrics_task = None;
421            }
422            ReplicaLocation::Managed(m) => {
423                let (service, metrics_task_join_handle) = self.provision_replica(
424                    cluster_id,
425                    replica_id,
426                    cluster_name,
427                    replica_name,
428                    role,
429                    m,
430                    enable_worker_core_affinity,
431                    enable_ctp,
432                )?;
433                storage_location = ClusterReplicaLocation {
434                    ctl_addrs: service.addresses("storagectl"),
435                };
436                compute_location = ClusterReplicaLocation {
437                    ctl_addrs: service.addresses("computectl"),
438                };
439                metrics_task = Some(metrics_task_join_handle);
440            }
441        }
442
443        self.storage
444            .connect_replica(cluster_id, replica_id, storage_location, enable_ctp);
445        self.compute.add_replica_to_instance(
446            cluster_id,
447            replica_id,
448            compute_location,
449            config.compute,
450            enable_ctp,
451        )?;
452
453        if let Some(task) = metrics_task {
454            self.metrics_tasks.insert(replica_id, task);
455        }
456
457        Ok(())
458    }
459
460    /// Drops the specified replica of the specified cluster.
461    pub fn drop_replica(
462        &mut self,
463        cluster_id: ClusterId,
464        replica_id: ReplicaId,
465    ) -> Result<(), anyhow::Error> {
466        // We unconditionally deprovision even for unmanaged replicas to avoid
467        // needing to keep track of which replicas are managed and which are
468        // unmanaged. Deprovisioning is a no-op if the replica ID was never
469        // provisioned.
470        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
471        self.metrics_tasks.remove(&replica_id);
472
473        self.compute.drop_replica(cluster_id, replica_id)?;
474        self.storage.drop_replica(cluster_id, replica_id);
475        Ok(())
476    }
477
478    /// Removes replicas from past generations in a background task.
479    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
480        let deploy_generation = self.deploy_generation;
481        let dyncfg = Arc::clone(self.compute.dyncfg());
482        let orchestrator = Arc::clone(&self.orchestrator);
483        task::spawn(
484            || "controller_remove_past_generation_replicas",
485            async move {
486                info!("attempting to remove past generation replicas");
487                loop {
488                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
489                        .await
490                    {
491                        Ok(()) => {
492                            info!("successfully removed past generation replicas");
493                            return;
494                        }
495                        Err(e) => {
496                            let interval =
497                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
498                                    .get(&dyncfg);
499                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
500                            time::sleep(interval).await;
501                        }
502                    }
503                }
504            },
505        );
506    }
507
508    /// Remove replicas that are orphaned in the current generation.
509    #[instrument]
510    pub async fn remove_orphaned_replicas(
511        &mut self,
512        next_user_replica_id: u64,
513        next_system_replica_id: u64,
514    ) -> Result<(), anyhow::Error> {
515        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
516
517        let actual: BTreeSet<_> = self
518            .orchestrator
519            .list_services()
520            .await?
521            .iter()
522            .map(|s| ReplicaServiceName::from_str(s))
523            .collect::<Result<_, _>>()?;
524
525        for ReplicaServiceName {
526            cluster_id,
527            replica_id,
528            generation,
529        } in actual
530        {
531            // We limit our attention here to replicas from the current deploy
532            // generation. Replicas from past generations are cleaned up during
533            // `Controller::allow_writes`.
534            if generation != self.deploy_generation {
535                continue;
536            }
537
538            let smaller_next = match replica_id {
539                ReplicaId::User(id) if id >= next_user_replica_id => {
540                    Some(ReplicaId::User(next_user_replica_id))
541                }
542                ReplicaId::System(id) if id >= next_system_replica_id => {
543                    Some(ReplicaId::System(next_system_replica_id))
544                }
545                _ => None,
546            };
547            if let Some(next) = smaller_next {
548                // Found a replica in the orchestrator with a higher replica ID
549                // than what we are aware of. This must have been created by an
550                // environmentd that's competing for control of this generation.
551                // Abort to let the other process have full control.
552                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
553            }
554            if !desired.contains(&replica_id) {
555                self.deprovision_replica(cluster_id, replica_id, generation)?;
556            }
557        }
558
559        Ok(())
560    }
561
562    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
563        let deploy_generation = self.deploy_generation;
564
565        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
566            let ReplicaServiceName {
567                cluster_id,
568                replica_id,
569                generation: replica_generation,
570                ..
571            } = event.service_id.parse()?;
572
573            let event = ClusterEvent {
574                cluster_id,
575                replica_id,
576                process_id: event.process_id,
577                status: event.status,
578                time: event.time,
579            };
580
581            Ok((event, replica_generation))
582        }
583
584        let stream = self
585            .orchestrator
586            .watch_services()
587            .map(|event| event.and_then(translate_event))
588            .filter_map(move |event| async move {
589                match event {
590                    Ok((event, replica_generation)) => {
591                        if replica_generation == deploy_generation {
592                            Some(event)
593                        } else {
594                            None
595                        }
596                    }
597                    Err(error) => {
598                        error!("service watch error: {error}");
599                        None
600                    }
601                }
602            });
603
604        Box::pin(stream)
605    }
606
607    /// Provisions a replica with the service orchestrator.
608    fn provision_replica(
609        &self,
610        cluster_id: ClusterId,
611        replica_id: ReplicaId,
612        cluster_name: String,
613        replica_name: String,
614        role: ClusterRole,
615        location: ManagedReplicaLocation,
616        enable_worker_core_affinity: bool,
617        enable_ctp: bool,
618    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
619        let service_name = ReplicaServiceName {
620            cluster_id,
621            replica_id,
622            generation: self.deploy_generation,
623        }
624        .to_string();
625        let role_label = match role {
626            ClusterRole::SystemCritical => "system-critical",
627            ClusterRole::System => "system",
628            ClusterRole::User => "user",
629        };
630        let environment_id = self.connection_context().environment_id.clone();
631        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
632        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
633        let persist_pubsub_url = self.persist_pubsub_url.clone();
634        let secrets_args = self.secrets_args.to_flags();
635
636        // TODO(teskje): use the same values as for compute?
637        let storage_proto_timely_config = TimelyConfig {
638            arrangement_exert_proportionality: 1337,
639            ..Default::default()
640        };
641        let compute_proto_timely_config = TimelyConfig {
642            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
643            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
644            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
645            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
646            ..Default::default()
647        };
648
649        let mut disk_limit = location.allocation.disk_limit;
650        let memory_limit = location.allocation.memory_limit;
651        let mut memory_request = None;
652
653        if location.allocation.swap_enabled {
654            // The disk limit we specify in the service config decides whether or not the replica
655            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
656            // make sure to set the disk limit accordingly.
657            disk_limit = Some(DiskLimit::ZERO);
658
659            // We want to keep the memory request equal to the memory limit, to avoid
660            // over-provisioning and ensure replicas have predictable performance. However, to
661            // enable swap, Kubernetes currently requires that request and limit are different.
662            memory_request = memory_limit.map(|MemoryLimit(limit)| {
663                let request = ByteSize::b(limit.as_u64() - 1);
664                MemoryLimit(request)
665            });
666        }
667
668        let service = self.orchestrator.ensure_service(
669            &service_name,
670            ServiceConfig {
671                image: self.clusterd_image.clone(),
672                init_container_image: self.init_container_image.clone(),
673                args: Box::new(move |assigned| {
674                    let storage_timely_config = TimelyConfig {
675                        workers: location.allocation.workers,
676                        addresses: assigned.peer_addresses("storage"),
677                        ..storage_proto_timely_config
678                    };
679                    let compute_timely_config = TimelyConfig {
680                        workers: location.allocation.workers,
681                        addresses: assigned.peer_addresses("compute"),
682                        ..compute_proto_timely_config
683                    };
684
685                    let mut args = vec![
686                        format!(
687                            "--storage-controller-listen-addr={}",
688                            assigned.listen_addrs["storagectl"]
689                        ),
690                        format!(
691                            "--compute-controller-listen-addr={}",
692                            assigned.listen_addrs["computectl"]
693                        ),
694                        format!(
695                            "--internal-http-listen-addr={}",
696                            assigned.listen_addrs["internal-http"]
697                        ),
698                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
699                        format!("--opentelemetry-resource=replica_id={}", replica_id),
700                        format!("--persist-pubsub-url={}", persist_pubsub_url),
701                        format!("--environment-id={}", environment_id),
702                        format!(
703                            "--storage-timely-config={}",
704                            storage_timely_config.to_string(),
705                        ),
706                        format!(
707                            "--compute-timely-config={}",
708                            compute_timely_config.to_string(),
709                        ),
710                    ];
711                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
712                        args.push(format!(
713                            "--aws-external-id-prefix={}",
714                            aws_external_id_prefix
715                        ));
716                    }
717                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
718                        args.push(format!(
719                            "--aws-connection-role-arn={}",
720                            aws_connection_role_arn
721                        ));
722                    }
723                    if let Some(memory_limit) = location.allocation.memory_limit {
724                        args.push(format!(
725                            "--announce-memory-limit={}",
726                            memory_limit.0.as_u64()
727                        ));
728                    }
729                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
730                        args.push("--worker-core-affinity".into());
731                    }
732                    if location.allocation.is_cc {
733                        args.push("--is-cc".into());
734                    }
735
736                    if enable_ctp {
737                        args.push("--use-ctp".into());
738                    }
739
740                    args.extend(secrets_args.clone());
741                    args
742                }),
743                ports: vec![
744                    ServicePort {
745                        name: "storagectl".into(),
746                        port_hint: 2100,
747                    },
748                    // To simplify the changes to tests, the port
749                    // chosen here is _after_ the compute ones.
750                    // TODO(petrosagg): fix the numerical ordering here
751                    ServicePort {
752                        name: "storage".into(),
753                        port_hint: 2103,
754                    },
755                    ServicePort {
756                        name: "computectl".into(),
757                        port_hint: 2101,
758                    },
759                    ServicePort {
760                        name: "compute".into(),
761                        port_hint: 2102,
762                    },
763                    ServicePort {
764                        name: "internal-http".into(),
765                        port_hint: 6878,
766                    },
767                ],
768                cpu_limit: location.allocation.cpu_limit,
769                memory_limit,
770                memory_request,
771                scale: location.allocation.scale,
772                labels: BTreeMap::from([
773                    ("replica-id".into(), replica_id.to_string()),
774                    ("cluster-id".into(), cluster_id.to_string()),
775                    ("type".into(), "cluster".into()),
776                    ("replica-role".into(), role_label.into()),
777                    ("workers".into(), location.allocation.workers.to_string()),
778                    (
779                        "size".into(),
780                        location
781                            .size
782                            .to_string()
783                            .replace("=", "-")
784                            .replace(",", "_"),
785                    ),
786                ]),
787                annotations: BTreeMap::from([
788                    ("replica-name".into(), replica_name),
789                    ("cluster-name".into(), cluster_name),
790                ]),
791                availability_zones: match location.availability_zones {
792                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
793                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
794                },
795                // This provides the orchestrator with some label selectors that
796                // are used to constraint the scheduling of replicas, based on
797                // its internal configuration.
798                other_replicas_selector: vec![
799                    LabelSelector {
800                        label_name: "cluster-id".to_string(),
801                        logic: LabelSelectionLogic::Eq {
802                            value: cluster_id.to_string(),
803                        },
804                    },
805                    // Select other replicas (but not oneself)
806                    LabelSelector {
807                        label_name: "replica-id".into(),
808                        logic: LabelSelectionLogic::NotEq {
809                            value: replica_id.to_string(),
810                        },
811                    },
812                ],
813                replicas_selector: vec![LabelSelector {
814                    label_name: "cluster-id".to_string(),
815                    // Select ALL replicas.
816                    logic: LabelSelectionLogic::Eq {
817                        value: cluster_id.to_string(),
818                    },
819                }],
820                disk_limit,
821                node_selector: location.allocation.selectors,
822            },
823        )?;
824
825        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
826            let tx = self.metrics_tx.clone();
827            let orchestrator = Arc::clone(&self.orchestrator);
828            let service_name = service_name.clone();
829            async move {
830                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
831
832                // TODO[btv] -- I tried implementing a `watch_metrics` function,
833                // similar to `watch_services`, but it crashed due to
834                // https://github.com/kube-rs/kube/issues/1092 .
835                //
836                // If `metrics-server` can be made to fill in `resourceVersion`,
837                // or if that bug is fixed, we can try that again rather than using this inelegant
838                // loop.
839                let mut interval = tokio::time::interval(METRICS_INTERVAL);
840                loop {
841                    interval.tick().await;
842                    match orchestrator.fetch_service_metrics(&service_name).await {
843                        Ok(metrics) => {
844                            let _ = tx.send((replica_id, metrics));
845                        }
846                        Err(e) => {
847                            warn!("failed to get metrics for replica {replica_id}: {e}");
848                        }
849                    }
850                }
851            }
852        });
853
854        Ok((service, metrics_task.abort_on_drop()))
855    }
856
857    /// Deprovisions a replica with the service orchestrator.
858    fn deprovision_replica(
859        &self,
860        cluster_id: ClusterId,
861        replica_id: ReplicaId,
862        generation: u64,
863    ) -> Result<(), anyhow::Error> {
864        let service_name = ReplicaServiceName {
865            cluster_id,
866            replica_id,
867            generation,
868        }
869        .to_string();
870        self.orchestrator.drop_service(&service_name)
871    }
872}
873
874/// Remove all replicas from past generations.
875async fn try_remove_past_generation_replicas(
876    orchestrator: &dyn NamespacedOrchestrator,
877    deploy_generation: u64,
878) -> Result<(), anyhow::Error> {
879    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
880
881    for service in services {
882        let name: ReplicaServiceName = service.parse()?;
883        if name.generation < deploy_generation {
884            info!(
885                cluster_id = %name.cluster_id,
886                replica_id = %name.replica_id,
887                "removing past generation replica",
888            );
889            orchestrator.drop_service(&service)?;
890        }
891    }
892
893    Ok(())
894}
895
896/// Represents the name of a cluster replica service in the orchestrator.
897#[derive(PartialEq, Eq, PartialOrd, Ord)]
898pub struct ReplicaServiceName {
899    pub cluster_id: ClusterId,
900    pub replica_id: ReplicaId,
901    pub generation: u64,
902}
903
904impl fmt::Display for ReplicaServiceName {
905    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
906        let ReplicaServiceName {
907            cluster_id,
908            replica_id,
909            generation,
910        } = self;
911        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
912    }
913}
914
915impl FromStr for ReplicaServiceName {
916    type Err = anyhow::Error;
917
918    fn from_str(s: &str) -> Result<Self, Self::Err> {
919        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
920            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
921        });
922
923        let caps = SERVICE_NAME_RE
924            .captures(s)
925            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
926
927        Ok(ReplicaServiceName {
928            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
929            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
930            // Old versions of Materialize did not include generations in
931            // replica service names. Synthesize generation 0 if absent.
932            // TODO: remove this in the next version of Materialize.
933            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
934        })
935    }
936}