Skip to main content

mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36    ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50/// Configures a cluster.
51pub struct ClusterConfig {
52    /// The logging variants to enable on the compute instance.
53    ///
54    /// Each logging variant is mapped to the identifier under which to register
55    /// the arrangement storing the log's data.
56    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57    /// An optional arbitrary string that describes the class of the workload
58    /// this cluster is running (e.g., `production` or `staging`).
59    pub workload_class: Option<String>,
60}
61
62/// The status of a cluster.
63pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65/// Configures a cluster replica.
66#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68    /// The location of the replica.
69    pub location: ReplicaLocation,
70    /// Configuration for the compute half of the replica.
71    pub compute: ComputeReplicaConfig,
72}
73
74/// Configures the resource allocation for a cluster replica.
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77    /// The memory limit for each process in the replica.
78    pub memory_limit: Option<MemoryLimit>,
79    /// The memory request for each process in the replica.
80    pub memory_request: Option<MemoryLimit>,
81    /// The CPU limit for each process in the replica.
82    pub cpu_limit: Option<CpuLimit>,
83    /// The CPU limit for each process in the replica.
84    pub cpu_request: Option<CpuLimit>,
85    /// The disk limit for each process in the replica.
86    pub disk_limit: Option<DiskLimit>,
87    /// The number of processes in the replica.
88    pub scale: NonZero<u16>,
89    /// The number of worker threads in the replica.
90    pub workers: NonZero<usize>,
91    /// The number of credits per hour that the replica consumes.
92    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
93    pub credits_per_hour: Numeric,
94    /// Whether each process has exclusive access to its CPU cores.
95    #[serde(default)]
96    pub cpu_exclusive: bool,
97    /// Whether this size represents a modern "cc" size rather than a legacy
98    /// T-shirt size.
99    #[serde(default = "default_true")]
100    pub is_cc: bool,
101    /// Whether instances of this type use swap as the spill-to-disk mechanism.
102    #[serde(default)]
103    pub swap_enabled: bool,
104    /// Whether instances of this type can be created.
105    #[serde(default)]
106    pub disabled: bool,
107    /// Additional node selectors.
108    #[serde(default)]
109    pub selectors: BTreeMap<String, String>,
110}
111
112fn default_true() -> bool {
113    true
114}
115
116#[mz_ore::test]
117// We test this particularly because we deserialize values from strings.
118#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
119fn test_replica_allocation_deserialization() {
120    use bytesize::ByteSize;
121    use mz_ore::{assert_err, assert_ok};
122
123    let data = r#"
124        {
125            "cpu_limit": 1.0,
126            "memory_limit": "10GiB",
127            "disk_limit": "100MiB",
128            "scale": 16,
129            "workers": 1,
130            "credits_per_hour": "16",
131            "swap_enabled": true,
132            "selectors": {
133                "key1": "value1",
134                "key2": "value2"
135            }
136        }"#;
137
138    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
139        .expect("deserialization from JSON succeeds for ReplicaAllocation");
140
141    assert_eq!(
142        replica_allocation,
143        ReplicaAllocation {
144            credits_per_hour: 16.into(),
145            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
146            disabled: false,
147            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
148            memory_request: None,
149            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
150            cpu_request: None,
151            cpu_exclusive: false,
152            is_cc: true,
153            swap_enabled: true,
154            scale: NonZero::new(16).unwrap(),
155            workers: NonZero::new(1).unwrap(),
156            selectors: BTreeMap::from([
157                ("key1".to_string(), "value1".to_string()),
158                ("key2".to_string(), "value2".to_string())
159            ]),
160        }
161    );
162
163    let data = r#"
164        {
165            "cpu_limit": 0,
166            "memory_limit": "0GiB",
167            "disk_limit": "0MiB",
168            "scale": 1,
169            "workers": 1,
170            "credits_per_hour": "0",
171            "cpu_exclusive": true,
172            "disabled": true
173        }"#;
174
175    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
176        .expect("deserialization from JSON succeeds for ReplicaAllocation");
177
178    assert_eq!(
179        replica_allocation,
180        ReplicaAllocation {
181            credits_per_hour: 0.into(),
182            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
183            disabled: true,
184            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
185            memory_request: None,
186            cpu_limit: Some(CpuLimit::from_millicpus(0)),
187            cpu_request: None,
188            cpu_exclusive: true,
189            is_cc: true,
190            swap_enabled: false,
191            scale: NonZero::new(1).unwrap(),
192            workers: NonZero::new(1).unwrap(),
193            selectors: Default::default(),
194        }
195    );
196
197    // `scale` and `workers` must be non-zero.
198    let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
199    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
200    let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
201    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
202    let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
203    assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
204}
205
206/// Configures the location of a cluster replica.
207#[derive(Clone, Debug, Serialize, PartialEq)]
208pub enum ReplicaLocation {
209    /// An unmanaged replica.
210    Unmanaged(UnmanagedReplicaLocation),
211    /// A managed replica.
212    Managed(ManagedReplicaLocation),
213}
214
215impl ReplicaLocation {
216    /// Returns the number of processes specified by this replica location.
217    pub fn num_processes(&self) -> usize {
218        match self {
219            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
220                computectl_addrs, ..
221            }) => computectl_addrs.len(),
222            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
223                allocation.scale.cast_into()
224            }
225        }
226    }
227
228    pub fn billed_as(&self) -> Option<&str> {
229        match self {
230            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
231                billed_as.as_deref()
232            }
233            ReplicaLocation::Unmanaged(_) => None,
234        }
235    }
236
237    pub fn internal(&self) -> bool {
238        match self {
239            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
240            ReplicaLocation::Unmanaged(_) => false,
241        }
242    }
243
244    /// Returns the number of workers specified by this replica location.
245    ///
246    /// `None` for unmanaged replicas, whose worker count we don't know.
247    pub fn workers(&self) -> Option<usize> {
248        match self {
249            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
250                Some(allocation.workers.get() * self.num_processes())
251            }
252            ReplicaLocation::Unmanaged(_) => None,
253        }
254    }
255
256    /// A pending replica is created as part of an alter cluster of an managed
257    /// cluster. the configuration of a pending replica will not match that of
258    /// the clusters until the alter has been finalized promoting the pending
259    /// replicas and setting this value to false.
260    pub fn pending(&self) -> bool {
261        match self {
262            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
263            ReplicaLocation::Unmanaged(_) => false,
264        }
265    }
266}
267
268/// The "role" of a cluster, which is currently used to determine the
269/// severity of alerts for problems with its replicas.
270#[derive(Debug, Clone)]
271pub enum ClusterRole {
272    /// The existence and proper functioning of the cluster's replicas is
273    /// business-critical for Materialize.
274    SystemCritical,
275    /// Assuming no bugs, the cluster's replicas should always exist and function
276    /// properly. If it doesn't, however, that is less urgent than
277    /// would be the case for a `SystemCritical` replica.
278    System,
279    /// The cluster is controlled by the user, and might go down for
280    /// reasons outside our control (e.g., OOMs).
281    User,
282}
283
284/// The location of an unmanaged replica.
285#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
286pub struct UnmanagedReplicaLocation {
287    /// The network addresses of the storagectl endpoints for each process in
288    /// the replica.
289    pub storagectl_addrs: Vec<String>,
290    /// The network addresses of the computectl endpoints for each process in
291    /// the replica.
292    pub computectl_addrs: Vec<String>,
293}
294
295/// Information about availability zone constraints for replicas.
296#[derive(Clone, Debug, PartialEq, Eq)]
297pub enum ManagedReplicaAvailabilityZones {
298    /// Specified if the `Replica` is from `MANAGED` cluster,
299    /// and specifies if there is an `AVAILABILITY ZONES`
300    /// constraint. Empty lists are represented as `None`.
301    FromCluster(Option<Vec<String>>),
302    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
303    /// and specifies if there is a specific `AVAILABILITY ZONE`.
304    FromReplica(Option<String>),
305}
306
307/// The location of a managed replica.
308#[derive(Clone, Debug, Serialize, PartialEq)]
309pub struct ManagedReplicaLocation {
310    /// The resource allocation for the replica.
311    pub allocation: ReplicaAllocation,
312    /// SQL size parameter used for allocation
313    pub size: String,
314    /// If `true`, Materialize support owns this replica.
315    pub internal: bool,
316    /// Optional SQL size parameter used for billing.
317    pub billed_as: Option<String>,
318    /// The replica's availability zones, if specified.
319    ///
320    /// This is either the replica's specific `AVAILABILITY ZONE`,
321    /// or the zones placed here during replica concretization
322    /// from the `MANAGED` cluster config.
323    ///
324    /// We skip serialization (which is used for some validation
325    /// in tests) as the latter case is a "virtual" piece of information,
326    /// that exists only at runtime.
327    ///
328    /// An empty list of availability zones is concretized as `None`,
329    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
330    /// is an empty list if none are specified
331    #[serde(skip)]
332    pub availability_zones: ManagedReplicaAvailabilityZones,
333    /// Whether the replica is pending reconfiguration
334    pub pending: bool,
335}
336
337impl ManagedReplicaLocation {
338    /// Return the size which should be used to determine billing-related information.
339    pub fn size_for_billing(&self) -> &str {
340        self.billed_as.as_deref().unwrap_or(&self.size)
341    }
342}
343
344/// Configures logging for a cluster replica.
345pub type ReplicaLogging = ComputeReplicaLogging;
346
347/// Identifier of a process within a replica.
348pub type ProcessId = u64;
349
350/// An event describing a change in status of a cluster replica process.
351#[derive(Debug, Clone, Serialize)]
352pub struct ClusterEvent {
353    pub cluster_id: ClusterId,
354    pub replica_id: ReplicaId,
355    pub process_id: ProcessId,
356    pub status: ClusterStatus,
357    pub time: DateTime<Utc>,
358}
359
360impl<T> Controller<T>
361where
362    T: ComputeControllerTimestamp,
363{
364    /// Creates a cluster with the specified identifier and configuration.
365    ///
366    /// A cluster is a combination of a storage instance and a compute instance.
367    /// A cluster has zero or more replicas; each replica colocates the storage
368    /// and compute layers on the same physical resources.
369    pub fn create_cluster(
370        &mut self,
371        id: ClusterId,
372        config: ClusterConfig,
373    ) -> Result<(), anyhow::Error> {
374        self.storage
375            .create_instance(id, config.workload_class.clone());
376        self.compute
377            .create_instance(id, config.arranged_logs, config.workload_class)?;
378        Ok(())
379    }
380
381    /// Updates the workload class for a cluster.
382    ///
383    /// # Panics
384    ///
385    /// Panics if the instance does not exist in the StorageController or the ComputeController.
386    pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
387        self.storage
388            .update_instance_workload_class(id, workload_class.clone());
389        self.compute
390            .update_instance_workload_class(id, workload_class)
391            .expect("instance exists");
392    }
393
394    /// Drops the specified cluster.
395    ///
396    /// # Panics
397    ///
398    /// Panics if the cluster still has replicas.
399    pub fn drop_cluster(&mut self, id: ClusterId) {
400        self.storage.drop_instance(id);
401        self.compute.drop_instance(id);
402    }
403
404    /// Creates a replica of the specified cluster with the specified identifier
405    /// and configuration.
406    pub fn create_replica(
407        &mut self,
408        cluster_id: ClusterId,
409        replica_id: ReplicaId,
410        cluster_name: String,
411        replica_name: String,
412        role: ClusterRole,
413        config: ReplicaConfig,
414        enable_worker_core_affinity: bool,
415    ) -> Result<(), anyhow::Error> {
416        let storage_location: ClusterReplicaLocation;
417        let compute_location: ClusterReplicaLocation;
418        let metrics_task: Option<AbortOnDropHandle<()>>;
419
420        match config.location {
421            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
422                storagectl_addrs,
423                computectl_addrs,
424            }) => {
425                compute_location = ClusterReplicaLocation {
426                    ctl_addrs: computectl_addrs,
427                };
428                storage_location = ClusterReplicaLocation {
429                    ctl_addrs: storagectl_addrs,
430                };
431                metrics_task = None;
432            }
433            ReplicaLocation::Managed(m) => {
434                let (service, metrics_task_join_handle) = self.provision_replica(
435                    cluster_id,
436                    replica_id,
437                    cluster_name,
438                    replica_name,
439                    role,
440                    m,
441                    enable_worker_core_affinity,
442                )?;
443                storage_location = ClusterReplicaLocation {
444                    ctl_addrs: service.addresses("storagectl"),
445                };
446                compute_location = ClusterReplicaLocation {
447                    ctl_addrs: service.addresses("computectl"),
448                };
449                metrics_task = Some(metrics_task_join_handle);
450
451                // Register the replica for HTTP proxying.
452                let http_addresses = service.addresses("internal-http");
453                self.replica_http_locator
454                    .register_replica(cluster_id, replica_id, http_addresses);
455            }
456        }
457
458        self.storage
459            .connect_replica(cluster_id, replica_id, storage_location);
460        self.compute.add_replica_to_instance(
461            cluster_id,
462            replica_id,
463            compute_location,
464            config.compute,
465        )?;
466
467        if let Some(task) = metrics_task {
468            self.metrics_tasks.insert(replica_id, task);
469        }
470
471        Ok(())
472    }
473
474    /// Drops the specified replica of the specified cluster.
475    pub fn drop_replica(
476        &mut self,
477        cluster_id: ClusterId,
478        replica_id: ReplicaId,
479    ) -> Result<(), anyhow::Error> {
480        // We unconditionally deprovision even for unmanaged replicas to avoid
481        // needing to keep track of which replicas are managed and which are
482        // unmanaged. Deprovisioning is a no-op if the replica ID was never
483        // provisioned.
484        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
485        self.metrics_tasks.remove(&replica_id);
486
487        // Remove HTTP addresses from the locator.
488        self.replica_http_locator
489            .remove_replica(cluster_id, replica_id);
490
491        self.compute.drop_replica(cluster_id, replica_id)?;
492        self.storage.drop_replica(cluster_id, replica_id);
493        Ok(())
494    }
495
496    /// Removes replicas from past generations in a background task.
497    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
498        let deploy_generation = self.deploy_generation;
499        let dyncfg = Arc::clone(self.compute.dyncfg());
500        let orchestrator = Arc::clone(&self.orchestrator);
501        task::spawn(
502            || "controller_remove_past_generation_replicas",
503            async move {
504                info!("attempting to remove past generation replicas");
505                loop {
506                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
507                        .await
508                    {
509                        Ok(()) => {
510                            info!("successfully removed past generation replicas");
511                            return;
512                        }
513                        Err(e) => {
514                            let interval =
515                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
516                                    .get(&dyncfg);
517                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
518                            time::sleep(interval).await;
519                        }
520                    }
521                }
522            },
523        );
524    }
525
526    /// Remove replicas that are orphaned in the current generation.
527    #[instrument]
528    pub async fn remove_orphaned_replicas(
529        &mut self,
530        next_user_replica_id: u64,
531        next_system_replica_id: u64,
532    ) -> Result<(), anyhow::Error> {
533        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
534
535        let actual: BTreeSet<_> = self
536            .orchestrator
537            .list_services()
538            .await?
539            .iter()
540            .map(|s| ReplicaServiceName::from_str(s))
541            .collect::<Result<_, _>>()?;
542
543        for ReplicaServiceName {
544            cluster_id,
545            replica_id,
546            generation,
547        } in actual
548        {
549            // We limit our attention here to replicas from the current deploy
550            // generation. Replicas from past generations are cleaned up during
551            // `Controller::allow_writes`.
552            if generation != self.deploy_generation {
553                continue;
554            }
555
556            let smaller_next = match replica_id {
557                ReplicaId::User(id) if id >= next_user_replica_id => {
558                    Some(ReplicaId::User(next_user_replica_id))
559                }
560                ReplicaId::System(id) if id >= next_system_replica_id => {
561                    Some(ReplicaId::System(next_system_replica_id))
562                }
563                _ => None,
564            };
565            if let Some(next) = smaller_next {
566                // Found a replica in the orchestrator with a higher replica ID
567                // than what we are aware of. This must have been created by an
568                // environmentd that's competing for control of this generation.
569                // Abort to let the other process have full control.
570                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
571            }
572            if !desired.contains(&replica_id) {
573                self.deprovision_replica(cluster_id, replica_id, generation)?;
574            }
575        }
576
577        Ok(())
578    }
579
580    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
581        let deploy_generation = self.deploy_generation;
582
583        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
584            let ReplicaServiceName {
585                cluster_id,
586                replica_id,
587                generation: replica_generation,
588                ..
589            } = event.service_id.parse()?;
590
591            let event = ClusterEvent {
592                cluster_id,
593                replica_id,
594                process_id: event.process_id,
595                status: event.status,
596                time: event.time,
597            };
598
599            Ok((event, replica_generation))
600        }
601
602        let stream = self
603            .orchestrator
604            .watch_services()
605            .map(|event| event.and_then(translate_event))
606            .filter_map(move |event| async move {
607                match event {
608                    Ok((event, replica_generation)) => {
609                        if replica_generation == deploy_generation {
610                            Some(event)
611                        } else {
612                            None
613                        }
614                    }
615                    Err(error) => {
616                        error!("service watch error: {error}");
617                        None
618                    }
619                }
620            });
621
622        Box::pin(stream)
623    }
624
625    /// Provisions a replica with the service orchestrator.
626    fn provision_replica(
627        &self,
628        cluster_id: ClusterId,
629        replica_id: ReplicaId,
630        cluster_name: String,
631        replica_name: String,
632        role: ClusterRole,
633        location: ManagedReplicaLocation,
634        enable_worker_core_affinity: bool,
635    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
636        let service_name = ReplicaServiceName {
637            cluster_id,
638            replica_id,
639            generation: self.deploy_generation,
640        }
641        .to_string();
642        let role_label = match role {
643            ClusterRole::SystemCritical => "system-critical",
644            ClusterRole::System => "system",
645            ClusterRole::User => "user",
646        };
647        let environment_id = self.connection_context().environment_id.clone();
648        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
649        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
650        let persist_pubsub_url = self.persist_pubsub_url.clone();
651        let secrets_args = self.secrets_args.to_flags();
652
653        // TODO(teskje): use the same values as for compute?
654        let storage_proto_timely_config = TimelyConfig {
655            arrangement_exert_proportionality: 1337,
656            ..Default::default()
657        };
658        let compute_proto_timely_config = TimelyConfig {
659            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
660            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
661            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
662            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
663            ..Default::default()
664        };
665
666        let mut disk_limit = location.allocation.disk_limit;
667        let memory_limit = location.allocation.memory_limit;
668        let mut memory_request = None;
669
670        if location.allocation.swap_enabled {
671            // The disk limit we specify in the service config decides whether or not the replica
672            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
673            // make sure to set the disk limit accordingly.
674            disk_limit = Some(DiskLimit::ZERO);
675
676            // We want to keep the memory request equal to the memory limit, to avoid
677            // over-provisioning and ensure replicas have predictable performance. However, to
678            // enable swap, Kubernetes currently requires that request and limit are different.
679            memory_request = memory_limit.map(|MemoryLimit(limit)| {
680                let request = ByteSize::b(limit.as_u64() - 1);
681                MemoryLimit(request)
682            });
683        }
684
685        let service = self.orchestrator.ensure_service(
686            &service_name,
687            ServiceConfig {
688                image: self.clusterd_image.clone(),
689                init_container_image: self.init_container_image.clone(),
690                args: Box::new(move |assigned| {
691                    let storage_timely_config = TimelyConfig {
692                        workers: location.allocation.workers.get(),
693                        addresses: assigned.peer_addresses("storage"),
694                        ..storage_proto_timely_config
695                    };
696                    let compute_timely_config = TimelyConfig {
697                        workers: location.allocation.workers.get(),
698                        addresses: assigned.peer_addresses("compute"),
699                        ..compute_proto_timely_config
700                    };
701
702                    let mut args = vec![
703                        format!(
704                            "--storage-controller-listen-addr={}",
705                            assigned.listen_addrs["storagectl"]
706                        ),
707                        format!(
708                            "--compute-controller-listen-addr={}",
709                            assigned.listen_addrs["computectl"]
710                        ),
711                        format!(
712                            "--internal-http-listen-addr={}",
713                            assigned.listen_addrs["internal-http"]
714                        ),
715                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
716                        format!("--opentelemetry-resource=replica_id={}", replica_id),
717                        format!("--persist-pubsub-url={}", persist_pubsub_url),
718                        format!("--environment-id={}", environment_id),
719                        format!(
720                            "--storage-timely-config={}",
721                            storage_timely_config.to_string(),
722                        ),
723                        format!(
724                            "--compute-timely-config={}",
725                            compute_timely_config.to_string(),
726                        ),
727                    ];
728                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
729                        args.push(format!(
730                            "--aws-external-id-prefix={}",
731                            aws_external_id_prefix
732                        ));
733                    }
734                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
735                        args.push(format!(
736                            "--aws-connection-role-arn={}",
737                            aws_connection_role_arn
738                        ));
739                    }
740                    if let Some(memory_limit) = location.allocation.memory_limit {
741                        args.push(format!(
742                            "--announce-memory-limit={}",
743                            memory_limit.0.as_u64()
744                        ));
745                    }
746                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
747                        args.push("--worker-core-affinity".into());
748                    }
749                    if location.allocation.is_cc {
750                        args.push("--is-cc".into());
751                    }
752
753                    // If swap is enabled, make the replica limit its own heap usage based on the
754                    // configured memory and disk limits.
755                    if location.allocation.swap_enabled
756                        && let Some(memory_limit) = location.allocation.memory_limit
757                        && let Some(disk_limit) = location.allocation.disk_limit
758                        // Currently, the way for replica sizes to request unlimited swap is to
759                        // specify a `disk_limit` of 0. Ideally we'd change this to make them
760                        // specify no disk limit instead, but for now we need to special-case here.
761                        && disk_limit != DiskLimit::ZERO
762                    {
763                        let heap_limit = memory_limit.0 + disk_limit.0;
764                        args.push(format!("--heap-limit={}", heap_limit.as_u64()));
765                    }
766
767                    args.extend(secrets_args.clone());
768                    args
769                }),
770                ports: vec![
771                    ServicePort {
772                        name: "storagectl".into(),
773                        port_hint: 2100,
774                    },
775                    // To simplify the changes to tests, the port
776                    // chosen here is _after_ the compute ones.
777                    // TODO(petrosagg): fix the numerical ordering here
778                    ServicePort {
779                        name: "storage".into(),
780                        port_hint: 2103,
781                    },
782                    ServicePort {
783                        name: "computectl".into(),
784                        port_hint: 2101,
785                    },
786                    ServicePort {
787                        name: "compute".into(),
788                        port_hint: 2102,
789                    },
790                    ServicePort {
791                        name: "internal-http".into(),
792                        port_hint: 6878,
793                    },
794                ],
795                cpu_limit: location.allocation.cpu_limit,
796                cpu_request: location.allocation.cpu_request,
797                memory_limit,
798                memory_request,
799                scale: location.allocation.scale,
800                labels: BTreeMap::from([
801                    ("replica-id".into(), replica_id.to_string()),
802                    ("cluster-id".into(), cluster_id.to_string()),
803                    ("type".into(), "cluster".into()),
804                    ("replica-role".into(), role_label.into()),
805                    ("workers".into(), location.allocation.workers.to_string()),
806                    (
807                        "size".into(),
808                        location
809                            .size
810                            .to_string()
811                            .replace("=", "-")
812                            .replace(",", "_"),
813                    ),
814                ]),
815                annotations: BTreeMap::from([
816                    ("replica-name".into(), replica_name),
817                    ("cluster-name".into(), cluster_name),
818                ]),
819                availability_zones: match location.availability_zones {
820                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
821                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
822                },
823                // This provides the orchestrator with some label selectors that
824                // are used to constraint the scheduling of replicas, based on
825                // its internal configuration.
826                other_replicas_selector: vec![
827                    LabelSelector {
828                        label_name: "cluster-id".to_string(),
829                        logic: LabelSelectionLogic::Eq {
830                            value: cluster_id.to_string(),
831                        },
832                    },
833                    // Select other replicas (but not oneself)
834                    LabelSelector {
835                        label_name: "replica-id".into(),
836                        logic: LabelSelectionLogic::NotEq {
837                            value: replica_id.to_string(),
838                        },
839                    },
840                ],
841                replicas_selector: vec![LabelSelector {
842                    label_name: "cluster-id".to_string(),
843                    // Select ALL replicas.
844                    logic: LabelSelectionLogic::Eq {
845                        value: cluster_id.to_string(),
846                    },
847                }],
848                disk_limit,
849                node_selector: location.allocation.selectors,
850            },
851        )?;
852
853        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
854            let tx = self.metrics_tx.clone();
855            let orchestrator = Arc::clone(&self.orchestrator);
856            let service_name = service_name.clone();
857            async move {
858                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
859
860                // TODO[btv] -- I tried implementing a `watch_metrics` function,
861                // similar to `watch_services`, but it crashed due to
862                // https://github.com/kube-rs/kube/issues/1092 .
863                //
864                // If `metrics-server` can be made to fill in `resourceVersion`,
865                // or if that bug is fixed, we can try that again rather than using this inelegant
866                // loop.
867                let mut interval = tokio::time::interval(METRICS_INTERVAL);
868                loop {
869                    interval.tick().await;
870                    match orchestrator.fetch_service_metrics(&service_name).await {
871                        Ok(metrics) => {
872                            let _ = tx.send((replica_id, metrics));
873                        }
874                        Err(e) => {
875                            warn!("failed to get metrics for replica {replica_id}: {e}");
876                        }
877                    }
878                }
879            }
880        });
881
882        Ok((service, metrics_task.abort_on_drop()))
883    }
884
885    /// Deprovisions a replica with the service orchestrator.
886    fn deprovision_replica(
887        &self,
888        cluster_id: ClusterId,
889        replica_id: ReplicaId,
890        generation: u64,
891    ) -> Result<(), anyhow::Error> {
892        let service_name = ReplicaServiceName {
893            cluster_id,
894            replica_id,
895            generation,
896        }
897        .to_string();
898        self.orchestrator.drop_service(&service_name)
899    }
900}
901
902/// Remove all replicas from past generations.
903async fn try_remove_past_generation_replicas(
904    orchestrator: &dyn NamespacedOrchestrator,
905    deploy_generation: u64,
906) -> Result<(), anyhow::Error> {
907    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
908
909    for service in services {
910        let name: ReplicaServiceName = service.parse()?;
911        if name.generation < deploy_generation {
912            info!(
913                cluster_id = %name.cluster_id,
914                replica_id = %name.replica_id,
915                "removing past generation replica",
916            );
917            orchestrator.drop_service(&service)?;
918        }
919    }
920
921    Ok(())
922}
923
924/// Represents the name of a cluster replica service in the orchestrator.
925#[derive(PartialEq, Eq, PartialOrd, Ord)]
926pub struct ReplicaServiceName {
927    pub cluster_id: ClusterId,
928    pub replica_id: ReplicaId,
929    pub generation: u64,
930}
931
932impl fmt::Display for ReplicaServiceName {
933    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
934        let ReplicaServiceName {
935            cluster_id,
936            replica_id,
937            generation,
938        } = self;
939        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
940    }
941}
942
943impl FromStr for ReplicaServiceName {
944    type Err = anyhow::Error;
945
946    fn from_str(s: &str) -> Result<Self, Self::Err> {
947        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
948            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
949        });
950
951        let caps = SERVICE_NAME_RE
952            .captures(s)
953            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
954
955        Ok(ReplicaServiceName {
956            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
957            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
958            // Old versions of Materialize did not include generations in
959            // replica service names. Synthesize generation 0 if absent.
960            // TODO: remove this in the next version of Materialize.
961            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
962        })
963    }
964}