Skip to main content

mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35    ServiceEvent, ServicePort,
36};
37use mz_ore::cast::CastInto;
38use mz_ore::task::{self, AbortOnDropHandle};
39use mz_ore::{halt, instrument};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49/// Configures a cluster.
50pub struct ClusterConfig {
51    /// The logging variants to enable on the compute instance.
52    ///
53    /// Each logging variant is mapped to the identifier under which to register
54    /// the arrangement storing the log's data.
55    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56    /// An optional arbitrary string that describes the class of the workload
57    /// this cluster is running (e.g., `production` or `staging`).
58    pub workload_class: Option<String>,
59}
60
61/// The status of a cluster.
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64/// Configures a cluster replica.
65#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67    /// The location of the replica.
68    pub location: ReplicaLocation,
69    /// Configuration for the compute half of the replica.
70    pub compute: ComputeReplicaConfig,
71}
72
73/// Configures the resource allocation for a cluster replica.
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76    /// The memory limit for each process in the replica.
77    pub memory_limit: Option<MemoryLimit>,
78    /// The CPU limit for each process in the replica.
79    pub cpu_limit: Option<CpuLimit>,
80    /// The CPU limit for each process in the replica.
81    pub cpu_request: Option<CpuLimit>,
82    /// The disk limit for each process in the replica.
83    pub disk_limit: Option<DiskLimit>,
84    /// The number of processes in the replica.
85    pub scale: NonZero<u16>,
86    /// The number of worker threads in the replica.
87    pub workers: NonZero<usize>,
88    /// The number of credits per hour that the replica consumes.
89    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90    pub credits_per_hour: Numeric,
91    /// Whether each process has exclusive access to its CPU cores.
92    #[serde(default)]
93    pub cpu_exclusive: bool,
94    /// Whether this size represents a modern "cc" size rather than a legacy
95    /// T-shirt size.
96    #[serde(default = "default_true")]
97    pub is_cc: bool,
98    /// Whether instances of this type use swap as the spill-to-disk mechanism.
99    #[serde(default)]
100    pub swap_enabled: bool,
101    /// Whether instances of this type can be created.
102    #[serde(default)]
103    pub disabled: bool,
104    /// Additional node selectors.
105    #[serde(default)]
106    pub selectors: BTreeMap<String, String>,
107}
108
109fn default_true() -> bool {
110    true
111}
112
113#[mz_ore::test]
114// We test this particularly because we deserialize values from strings.
115#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
116fn test_replica_allocation_deserialization() {
117    use bytesize::ByteSize;
118    use mz_ore::{assert_err, assert_ok};
119
120    let data = r#"
121        {
122            "cpu_limit": 1.0,
123            "memory_limit": "10GiB",
124            "disk_limit": "100MiB",
125            "scale": 16,
126            "workers": 1,
127            "credits_per_hour": "16",
128            "swap_enabled": true,
129            "selectors": {
130                "key1": "value1",
131                "key2": "value2"
132            }
133        }"#;
134
135    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
136        .expect("deserialization from JSON succeeds for ReplicaAllocation");
137
138    assert_eq!(
139        replica_allocation,
140        ReplicaAllocation {
141            credits_per_hour: 16.into(),
142            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
143            disabled: false,
144            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
145            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
146            cpu_request: None,
147            cpu_exclusive: false,
148            is_cc: true,
149            swap_enabled: true,
150            scale: NonZero::new(16).unwrap(),
151            workers: NonZero::new(1).unwrap(),
152            selectors: BTreeMap::from([
153                ("key1".to_string(), "value1".to_string()),
154                ("key2".to_string(), "value2".to_string())
155            ]),
156        }
157    );
158
159    let data = r#"
160        {
161            "cpu_limit": 0,
162            "memory_limit": "0GiB",
163            "disk_limit": "0MiB",
164            "scale": 1,
165            "workers": 1,
166            "credits_per_hour": "0",
167            "cpu_exclusive": true,
168            "disabled": true
169        }"#;
170
171    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
172        .expect("deserialization from JSON succeeds for ReplicaAllocation");
173
174    assert_eq!(
175        replica_allocation,
176        ReplicaAllocation {
177            credits_per_hour: 0.into(),
178            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
179            disabled: true,
180            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
181            cpu_limit: Some(CpuLimit::from_millicpus(0)),
182            cpu_request: None,
183            cpu_exclusive: true,
184            is_cc: true,
185            swap_enabled: false,
186            scale: NonZero::new(1).unwrap(),
187            workers: NonZero::new(1).unwrap(),
188            selectors: Default::default(),
189        }
190    );
191
192    // `scale` and `workers` must be non-zero.
193    let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
194    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
195    let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
196    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
197    let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
198    assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
199}
200
201/// Configures the location of a cluster replica.
202#[derive(Clone, Debug, Serialize, PartialEq)]
203pub enum ReplicaLocation {
204    /// An unmanaged replica.
205    Unmanaged(UnmanagedReplicaLocation),
206    /// A managed replica.
207    Managed(ManagedReplicaLocation),
208}
209
210impl ReplicaLocation {
211    /// Returns the number of processes specified by this replica location.
212    pub fn num_processes(&self) -> usize {
213        match self {
214            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
215                computectl_addrs, ..
216            }) => computectl_addrs.len(),
217            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
218                allocation.scale.cast_into()
219            }
220        }
221    }
222
223    pub fn billed_as(&self) -> Option<&str> {
224        match self {
225            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
226                billed_as.as_deref()
227            }
228            ReplicaLocation::Unmanaged(_) => None,
229        }
230    }
231
232    pub fn internal(&self) -> bool {
233        match self {
234            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
235            ReplicaLocation::Unmanaged(_) => false,
236        }
237    }
238
239    /// Returns the number of workers specified by this replica location.
240    ///
241    /// `None` for unmanaged replicas, whose worker count we don't know.
242    pub fn workers(&self) -> Option<usize> {
243        match self {
244            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
245                Some(allocation.workers.get() * self.num_processes())
246            }
247            ReplicaLocation::Unmanaged(_) => None,
248        }
249    }
250
251    /// A pending replica is created as part of an alter cluster of an managed
252    /// cluster. the configuration of a pending replica will not match that of
253    /// the clusters until the alter has been finalized promoting the pending
254    /// replicas and setting this value to false.
255    pub fn pending(&self) -> bool {
256        match self {
257            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
258            ReplicaLocation::Unmanaged(_) => false,
259        }
260    }
261}
262
263/// The "role" of a cluster, which is currently used to determine the
264/// severity of alerts for problems with its replicas.
265#[derive(Debug, Clone)]
266pub enum ClusterRole {
267    /// The existence and proper functioning of the cluster's replicas is
268    /// business-critical for Materialize.
269    SystemCritical,
270    /// Assuming no bugs, the cluster's replicas should always exist and function
271    /// properly. If it doesn't, however, that is less urgent than
272    /// would be the case for a `SystemCritical` replica.
273    System,
274    /// The cluster is controlled by the user, and might go down for
275    /// reasons outside our control (e.g., OOMs).
276    User,
277}
278
279/// The location of an unmanaged replica.
280#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
281pub struct UnmanagedReplicaLocation {
282    /// The network addresses of the storagectl endpoints for each process in
283    /// the replica.
284    pub storagectl_addrs: Vec<String>,
285    /// The network addresses of the computectl endpoints for each process in
286    /// the replica.
287    pub computectl_addrs: Vec<String>,
288}
289
290/// Information about availability zone constraints for replicas.
291#[derive(Clone, Debug, PartialEq, Eq)]
292pub enum ManagedReplicaAvailabilityZones {
293    /// Specified if the `Replica` is from `MANAGED` cluster,
294    /// and specifies if there is an `AVAILABILITY ZONES`
295    /// constraint. Empty lists are represented as `None`.
296    FromCluster(Option<Vec<String>>),
297    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
298    /// and specifies if there is a specific `AVAILABILITY ZONE`.
299    FromReplica(Option<String>),
300}
301
302/// The location of a managed replica.
303#[derive(Clone, Debug, Serialize, PartialEq)]
304pub struct ManagedReplicaLocation {
305    /// The resource allocation for the replica.
306    pub allocation: ReplicaAllocation,
307    /// SQL size parameter used for allocation
308    pub size: String,
309    /// If `true`, Materialize support owns this replica.
310    pub internal: bool,
311    /// Optional SQL size parameter used for billing.
312    pub billed_as: Option<String>,
313    /// The replica's availability zones, if specified.
314    ///
315    /// This is either the replica's specific `AVAILABILITY ZONE`,
316    /// or the zones placed here during replica concretization
317    /// from the `MANAGED` cluster config.
318    ///
319    /// We skip serialization (which is used for some validation
320    /// in tests) as the latter case is a "virtual" piece of information,
321    /// that exists only at runtime.
322    ///
323    /// An empty list of availability zones is concretized as `None`,
324    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
325    /// is an empty list if none are specified
326    #[serde(skip)]
327    pub availability_zones: ManagedReplicaAvailabilityZones,
328    /// Whether the replica is pending reconfiguration
329    pub pending: bool,
330}
331
332impl ManagedReplicaLocation {
333    /// Return the size which should be used to determine billing-related information.
334    pub fn size_for_billing(&self) -> &str {
335        self.billed_as.as_deref().unwrap_or(&self.size)
336    }
337}
338
339/// Configures logging for a cluster replica.
340pub type ReplicaLogging = ComputeReplicaLogging;
341
342/// Identifier of a process within a replica.
343pub type ProcessId = u64;
344
345/// An event describing a change in status of a cluster replica process.
346#[derive(Debug, Clone, Serialize)]
347pub struct ClusterEvent {
348    pub cluster_id: ClusterId,
349    pub replica_id: ReplicaId,
350    pub process_id: ProcessId,
351    pub status: ClusterStatus,
352    pub time: DateTime<Utc>,
353}
354
355impl Controller {
356    /// Creates a cluster with the specified identifier and configuration.
357    ///
358    /// A cluster is a combination of a storage instance and a compute instance.
359    /// A cluster has zero or more replicas; each replica colocates the storage
360    /// and compute layers on the same physical resources.
361    pub fn create_cluster(
362        &mut self,
363        id: ClusterId,
364        config: ClusterConfig,
365    ) -> Result<(), anyhow::Error> {
366        self.storage
367            .create_instance(id, config.workload_class.clone());
368        self.compute
369            .create_instance(id, config.arranged_logs, config.workload_class)?;
370        Ok(())
371    }
372
373    /// Updates the workload class for a cluster.
374    ///
375    /// # Panics
376    ///
377    /// Panics if the instance does not exist in the StorageController or the ComputeController.
378    pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
379        self.storage
380            .update_instance_workload_class(id, workload_class.clone());
381        self.compute
382            .update_instance_workload_class(id, workload_class)
383            .expect("instance exists");
384    }
385
386    /// Drops the specified cluster.
387    ///
388    /// # Panics
389    ///
390    /// Panics if the cluster still has replicas.
391    pub fn drop_cluster(&mut self, id: ClusterId) {
392        self.storage.drop_instance(id);
393        self.compute.drop_instance(id);
394    }
395
396    /// Creates a replica of the specified cluster with the specified identifier
397    /// and configuration.
398    pub fn create_replica(
399        &mut self,
400        cluster_id: ClusterId,
401        replica_id: ReplicaId,
402        cluster_name: String,
403        replica_name: String,
404        role: ClusterRole,
405        config: ReplicaConfig,
406        enable_worker_core_affinity: bool,
407        enable_storage_introspection_logs: bool,
408    ) -> Result<(), anyhow::Error> {
409        let storage_location: ClusterReplicaLocation;
410        let compute_location: ClusterReplicaLocation;
411        let metrics_task: Option<AbortOnDropHandle<()>>;
412
413        match config.location {
414            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
415                storagectl_addrs,
416                computectl_addrs,
417            }) => {
418                compute_location = ClusterReplicaLocation {
419                    ctl_addrs: computectl_addrs,
420                };
421                storage_location = ClusterReplicaLocation {
422                    ctl_addrs: storagectl_addrs,
423                };
424                metrics_task = None;
425            }
426            ReplicaLocation::Managed(m) => {
427                let (service, metrics_task_join_handle) = self.provision_replica(
428                    cluster_id,
429                    replica_id,
430                    cluster_name,
431                    replica_name,
432                    role,
433                    m,
434                    enable_worker_core_affinity,
435                    enable_storage_introspection_logs,
436                )?;
437                storage_location = ClusterReplicaLocation {
438                    ctl_addrs: service.addresses("storagectl"),
439                };
440                compute_location = ClusterReplicaLocation {
441                    ctl_addrs: service.addresses("computectl"),
442                };
443                metrics_task = Some(metrics_task_join_handle);
444
445                // Register the replica for HTTP proxying.
446                let http_addresses = service.addresses("internal-http");
447                self.replica_http_locator
448                    .register_replica(cluster_id, replica_id, http_addresses);
449            }
450        }
451
452        self.storage
453            .connect_replica(cluster_id, replica_id, storage_location);
454        self.compute.add_replica_to_instance(
455            cluster_id,
456            replica_id,
457            compute_location,
458            config.compute,
459        )?;
460
461        if let Some(task) = metrics_task {
462            self.metrics_tasks.insert(replica_id, task);
463        }
464
465        Ok(())
466    }
467
468    /// Drops the specified replica of the specified cluster.
469    pub fn drop_replica(
470        &mut self,
471        cluster_id: ClusterId,
472        replica_id: ReplicaId,
473    ) -> Result<(), anyhow::Error> {
474        // We unconditionally deprovision even for unmanaged replicas to avoid
475        // needing to keep track of which replicas are managed and which are
476        // unmanaged. Deprovisioning is a no-op if the replica ID was never
477        // provisioned.
478        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
479        self.metrics_tasks.remove(&replica_id);
480
481        // Remove HTTP addresses from the locator.
482        self.replica_http_locator
483            .remove_replica(cluster_id, replica_id);
484
485        self.compute.drop_replica(cluster_id, replica_id)?;
486        self.storage.drop_replica(cluster_id, replica_id);
487        Ok(())
488    }
489
490    /// Removes replicas from past generations in a background task.
491    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
492        let deploy_generation = self.deploy_generation;
493        let dyncfg = Arc::clone(self.compute.dyncfg());
494        let orchestrator = Arc::clone(&self.orchestrator);
495        task::spawn(
496            || "controller_remove_past_generation_replicas",
497            async move {
498                info!("attempting to remove past generation replicas");
499                loop {
500                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
501                        .await
502                    {
503                        Ok(()) => {
504                            info!("successfully removed past generation replicas");
505                            return;
506                        }
507                        Err(e) => {
508                            let interval =
509                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
510                                    .get(&dyncfg);
511                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
512                            time::sleep(interval).await;
513                        }
514                    }
515                }
516            },
517        );
518    }
519
520    /// Remove replicas that are orphaned in the current generation.
521    #[instrument]
522    pub async fn remove_orphaned_replicas(
523        &mut self,
524        next_user_replica_id: u64,
525        next_system_replica_id: u64,
526    ) -> Result<(), anyhow::Error> {
527        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
528
529        let actual: BTreeSet<_> = self
530            .orchestrator
531            .list_services()
532            .await?
533            .iter()
534            .map(|s| ReplicaServiceName::from_str(s))
535            .collect::<Result<_, _>>()?;
536
537        for ReplicaServiceName {
538            cluster_id,
539            replica_id,
540            generation,
541        } in actual
542        {
543            // We limit our attention here to replicas from the current deploy
544            // generation. Replicas from past generations are cleaned up during
545            // `Controller::allow_writes`.
546            if generation != self.deploy_generation {
547                continue;
548            }
549
550            let smaller_next = match replica_id {
551                ReplicaId::User(id) if id >= next_user_replica_id => {
552                    Some(ReplicaId::User(next_user_replica_id))
553                }
554                ReplicaId::System(id) if id >= next_system_replica_id => {
555                    Some(ReplicaId::System(next_system_replica_id))
556                }
557                _ => None,
558            };
559            if let Some(next) = smaller_next {
560                // Found a replica in the orchestrator with a higher replica ID
561                // than what we are aware of. This must have been created by an
562                // environmentd that's competing for control of this generation.
563                // Abort to let the other process have full control.
564                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
565            }
566            if !desired.contains(&replica_id) {
567                self.deprovision_replica(cluster_id, replica_id, generation)?;
568            }
569        }
570
571        Ok(())
572    }
573
574    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
575        let deploy_generation = self.deploy_generation;
576
577        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
578            let ReplicaServiceName {
579                cluster_id,
580                replica_id,
581                generation: replica_generation,
582                ..
583            } = event.service_id.parse()?;
584
585            let event = ClusterEvent {
586                cluster_id,
587                replica_id,
588                process_id: event.process_id,
589                status: event.status,
590                time: event.time,
591            };
592
593            Ok((event, replica_generation))
594        }
595
596        let stream = self
597            .orchestrator
598            .watch_services()
599            .map(|event| event.and_then(translate_event))
600            .filter_map(move |event| async move {
601                match event {
602                    Ok((event, replica_generation)) => {
603                        if replica_generation == deploy_generation {
604                            Some(event)
605                        } else {
606                            None
607                        }
608                    }
609                    Err(error) => {
610                        error!("service watch error: {error}");
611                        None
612                    }
613                }
614            });
615
616        Box::pin(stream)
617    }
618
619    /// Provisions a replica with the service orchestrator.
620    fn provision_replica(
621        &self,
622        cluster_id: ClusterId,
623        replica_id: ReplicaId,
624        cluster_name: String,
625        replica_name: String,
626        role: ClusterRole,
627        location: ManagedReplicaLocation,
628        enable_worker_core_affinity: bool,
629        enable_storage_introspection_logs: bool,
630    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
631        let service_name = ReplicaServiceName {
632            cluster_id,
633            replica_id,
634            generation: self.deploy_generation,
635        }
636        .to_string();
637        let role_label = match role {
638            ClusterRole::SystemCritical => "system-critical",
639            ClusterRole::System => "system",
640            ClusterRole::User => "user",
641        };
642        let environment_id = self.connection_context().environment_id.clone();
643        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
644        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
645        let persist_pubsub_url = self.persist_pubsub_url.clone();
646        let secrets_args = self.secrets_args.to_flags();
647
648        // TODO(teskje): use the same values as for compute?
649        let storage_proto_timely_config = TimelyConfig {
650            arrangement_exert_proportionality: 1337,
651            ..Default::default()
652        };
653        let compute_proto_timely_config = TimelyConfig {
654            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
655            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
656            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
657            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
658            ..Default::default()
659        };
660
661        let mut disk_limit = location.allocation.disk_limit;
662        let memory_limit = location.allocation.memory_limit;
663        let mut memory_request = None;
664
665        if location.allocation.swap_enabled {
666            // The disk limit we specify in the service config decides whether or not the replica
667            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
668            // make sure to set the disk limit accordingly.
669            disk_limit = Some(DiskLimit::ZERO);
670
671            // We want to keep the memory request equal to the memory limit, to avoid
672            // over-provisioning and ensure replicas have predictable performance. However, to
673            // enable swap, Kubernetes currently requires that request and limit are different.
674            memory_request = memory_limit.map(|MemoryLimit(limit)| {
675                let request = ByteSize::b(limit.as_u64() - 1);
676                MemoryLimit(request)
677            });
678        }
679
680        let service = self.orchestrator.ensure_service(
681            &service_name,
682            ServiceConfig {
683                image: self.clusterd_image.clone(),
684                init_container_image: self.init_container_image.clone(),
685                args: Box::new(move |assigned| {
686                    let storage_timely_config = TimelyConfig {
687                        workers: location.allocation.workers.get(),
688                        addresses: assigned.peer_addresses("storage"),
689                        ..storage_proto_timely_config
690                    };
691                    let compute_timely_config = TimelyConfig {
692                        workers: location.allocation.workers.get(),
693                        addresses: assigned.peer_addresses("compute"),
694                        ..compute_proto_timely_config
695                    };
696
697                    let mut args = vec![
698                        format!(
699                            "--storage-controller-listen-addr={}",
700                            assigned.listen_addrs["storagectl"]
701                        ),
702                        format!(
703                            "--compute-controller-listen-addr={}",
704                            assigned.listen_addrs["computectl"]
705                        ),
706                        format!(
707                            "--internal-http-listen-addr={}",
708                            assigned.listen_addrs["internal-http"]
709                        ),
710                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
711                        format!("--opentelemetry-resource=replica_id={}", replica_id),
712                        format!("--persist-pubsub-url={}", persist_pubsub_url),
713                        format!("--environment-id={}", environment_id),
714                        format!(
715                            "--storage-timely-config={}",
716                            storage_timely_config.to_string(),
717                        ),
718                        format!(
719                            "--compute-timely-config={}",
720                            compute_timely_config.to_string(),
721                        ),
722                    ];
723                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
724                        args.push(format!(
725                            "--aws-external-id-prefix={}",
726                            aws_external_id_prefix
727                        ));
728                    }
729                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
730                        args.push(format!(
731                            "--aws-connection-role-arn={}",
732                            aws_connection_role_arn
733                        ));
734                    }
735                    if let Some(memory_limit) = location.allocation.memory_limit {
736                        args.push(format!(
737                            "--announce-memory-limit={}",
738                            memory_limit.0.as_u64()
739                        ));
740                    }
741                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
742                        args.push("--worker-core-affinity".into());
743                    }
744                    if enable_storage_introspection_logs {
745                        args.push("--enable-storage-introspection-logs".into());
746                    }
747                    if location.allocation.is_cc {
748                        args.push("--is-cc".into());
749                    }
750
751                    // If swap is enabled, make the replica limit its own heap usage based on the
752                    // configured memory and disk limits.
753                    if location.allocation.swap_enabled
754                        && let Some(memory_limit) = location.allocation.memory_limit
755                        && let Some(disk_limit) = location.allocation.disk_limit
756                        // Currently, the way for replica sizes to request unlimited swap is to
757                        // specify a `disk_limit` of 0. Ideally we'd change this to make them
758                        // specify no disk limit instead, but for now we need to special-case here.
759                        && disk_limit != DiskLimit::ZERO
760                    {
761                        let heap_limit = memory_limit.0 + disk_limit.0;
762                        args.push(format!("--heap-limit={}", heap_limit.as_u64()));
763                    }
764
765                    args.extend(secrets_args.clone());
766                    args
767                }),
768                ports: vec![
769                    ServicePort {
770                        name: "storagectl".into(),
771                        port_hint: 2100,
772                    },
773                    // To simplify the changes to tests, the port
774                    // chosen here is _after_ the compute ones.
775                    // TODO(petrosagg): fix the numerical ordering here
776                    ServicePort {
777                        name: "storage".into(),
778                        port_hint: 2103,
779                    },
780                    ServicePort {
781                        name: "computectl".into(),
782                        port_hint: 2101,
783                    },
784                    ServicePort {
785                        name: "compute".into(),
786                        port_hint: 2102,
787                    },
788                    ServicePort {
789                        name: "internal-http".into(),
790                        port_hint: 6878,
791                    },
792                ],
793                cpu_limit: location.allocation.cpu_limit,
794                cpu_request: location.allocation.cpu_request,
795                memory_limit,
796                memory_request,
797                scale: location.allocation.scale,
798                labels: BTreeMap::from([
799                    ("replica-id".into(), replica_id.to_string()),
800                    ("cluster-id".into(), cluster_id.to_string()),
801                    ("type".into(), "cluster".into()),
802                    ("replica-role".into(), role_label.into()),
803                    ("workers".into(), location.allocation.workers.to_string()),
804                    (
805                        "size".into(),
806                        location
807                            .size
808                            .to_string()
809                            .replace("=", "-")
810                            .replace(",", "_"),
811                    ),
812                ]),
813                annotations: BTreeMap::from([
814                    ("replica-name".into(), replica_name),
815                    ("cluster-name".into(), cluster_name),
816                ]),
817                availability_zones: match location.availability_zones {
818                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
819                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
820                },
821                // This provides the orchestrator with some label selectors that
822                // are used to constraint the scheduling of replicas, based on
823                // its internal configuration.
824                other_replicas_selector: vec![
825                    LabelSelector {
826                        label_name: "cluster-id".to_string(),
827                        logic: LabelSelectionLogic::Eq {
828                            value: cluster_id.to_string(),
829                        },
830                    },
831                    // Select other replicas (but not oneself)
832                    LabelSelector {
833                        label_name: "replica-id".into(),
834                        logic: LabelSelectionLogic::NotEq {
835                            value: replica_id.to_string(),
836                        },
837                    },
838                ],
839                replicas_selector: vec![LabelSelector {
840                    label_name: "cluster-id".to_string(),
841                    // Select ALL replicas.
842                    logic: LabelSelectionLogic::Eq {
843                        value: cluster_id.to_string(),
844                    },
845                }],
846                disk_limit,
847                node_selector: location.allocation.selectors,
848            },
849        )?;
850
851        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
852            let tx = self.metrics_tx.clone();
853            let orchestrator = Arc::clone(&self.orchestrator);
854            let service_name = service_name.clone();
855            async move {
856                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
857
858                // TODO[btv] -- I tried implementing a `watch_metrics` function,
859                // similar to `watch_services`, but it crashed due to
860                // https://github.com/kube-rs/kube/issues/1092 .
861                //
862                // If `metrics-server` can be made to fill in `resourceVersion`,
863                // or if that bug is fixed, we can try that again rather than using this inelegant
864                // loop.
865                let mut interval = tokio::time::interval(METRICS_INTERVAL);
866                loop {
867                    interval.tick().await;
868                    match orchestrator.fetch_service_metrics(&service_name).await {
869                        Ok(metrics) => {
870                            let _ = tx.send((replica_id, metrics));
871                        }
872                        Err(e) => {
873                            warn!("failed to get metrics for replica {replica_id}: {e}");
874                        }
875                    }
876                }
877            }
878        });
879
880        Ok((service, metrics_task.abort_on_drop()))
881    }
882
883    /// Deprovisions a replica with the service orchestrator.
884    fn deprovision_replica(
885        &self,
886        cluster_id: ClusterId,
887        replica_id: ReplicaId,
888        generation: u64,
889    ) -> Result<(), anyhow::Error> {
890        let service_name = ReplicaServiceName {
891            cluster_id,
892            replica_id,
893            generation,
894        }
895        .to_string();
896        self.orchestrator.drop_service(&service_name)
897    }
898}
899
900/// Remove all replicas from past generations.
901async fn try_remove_past_generation_replicas(
902    orchestrator: &dyn NamespacedOrchestrator,
903    deploy_generation: u64,
904) -> Result<(), anyhow::Error> {
905    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
906
907    for service in services {
908        let name: ReplicaServiceName = service.parse()?;
909        if name.generation < deploy_generation {
910            info!(
911                cluster_id = %name.cluster_id,
912                replica_id = %name.replica_id,
913                "removing past generation replica",
914            );
915            orchestrator.drop_service(&service)?;
916        }
917    }
918
919    Ok(())
920}
921
922/// Represents the name of a cluster replica service in the orchestrator.
923#[derive(PartialEq, Eq, PartialOrd, Ord)]
924pub struct ReplicaServiceName {
925    pub cluster_id: ClusterId,
926    pub replica_id: ReplicaId,
927    pub generation: u64,
928}
929
930impl fmt::Display for ReplicaServiceName {
931    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
932        let ReplicaServiceName {
933            cluster_id,
934            replica_id,
935            generation,
936        } = self;
937        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
938    }
939}
940
941impl FromStr for ReplicaServiceName {
942    type Err = anyhow::Error;
943
944    fn from_str(s: &str) -> Result<Self, Self::Err> {
945        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
946            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
947        });
948
949        let caps = SERVICE_NAME_RE
950            .captures(s)
951            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
952
953        Ok(ReplicaServiceName {
954            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
955            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
956            // Old versions of Materialize did not include generations in
957            // replica service names. Synthesize generation 0 if absent.
958            // TODO: remove this in the next version of Materialize.
959            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
960        })
961    }
962}