Skip to main content

mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::controller::ComputeControllerTimestamp;
26use mz_compute_client::logging::LogVariant;
27use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
28use mz_controller_types::dyncfgs::{
29    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
30    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
31};
32use mz_controller_types::{ClusterId, ReplicaId};
33use mz_orchestrator::NamespacedOrchestrator;
34use mz_orchestrator::{
35    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
36    ServiceEvent, ServicePort,
37};
38use mz_ore::cast::CastInto;
39use mz_ore::task::{self, AbortOnDropHandle};
40use mz_ore::{halt, instrument};
41use mz_repr::GlobalId;
42use mz_repr::adt::numeric::Numeric;
43use regex::Regex;
44use serde::{Deserialize, Serialize};
45use tokio::time;
46use tracing::{error, info, warn};
47
48use crate::Controller;
49
50/// Configures a cluster.
51pub struct ClusterConfig {
52    /// The logging variants to enable on the compute instance.
53    ///
54    /// Each logging variant is mapped to the identifier under which to register
55    /// the arrangement storing the log's data.
56    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
57    /// An optional arbitrary string that describes the class of the workload
58    /// this cluster is running (e.g., `production` or `staging`).
59    pub workload_class: Option<String>,
60}
61
62/// The status of a cluster.
63pub type ClusterStatus = mz_orchestrator::ServiceStatus;
64
65/// Configures a cluster replica.
66#[derive(Clone, Debug, Serialize, PartialEq)]
67pub struct ReplicaConfig {
68    /// The location of the replica.
69    pub location: ReplicaLocation,
70    /// Configuration for the compute half of the replica.
71    pub compute: ComputeReplicaConfig,
72}
73
74/// Configures the resource allocation for a cluster replica.
75#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
76pub struct ReplicaAllocation {
77    /// The memory limit for each process in the replica.
78    pub memory_limit: Option<MemoryLimit>,
79    /// The CPU limit for each process in the replica.
80    pub cpu_limit: Option<CpuLimit>,
81    /// The disk limit for each process in the replica.
82    pub disk_limit: Option<DiskLimit>,
83    /// The number of processes in the replica.
84    pub scale: NonZero<u16>,
85    /// The number of worker threads in the replica.
86    pub workers: NonZero<usize>,
87    /// The number of credits per hour that the replica consumes.
88    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
89    pub credits_per_hour: Numeric,
90    /// Whether each process has exclusive access to its CPU cores.
91    #[serde(default)]
92    pub cpu_exclusive: bool,
93    /// Whether this size represents a modern "cc" size rather than a legacy
94    /// T-shirt size.
95    #[serde(default = "default_true")]
96    pub is_cc: bool,
97    /// Whether instances of this type use swap as the spill-to-disk mechanism.
98    #[serde(default)]
99    pub swap_enabled: bool,
100    /// Whether instances of this type can be created.
101    #[serde(default)]
102    pub disabled: bool,
103    /// Additional node selectors.
104    #[serde(default)]
105    pub selectors: BTreeMap<String, String>,
106}
107
108fn default_true() -> bool {
109    true
110}
111
112#[mz_ore::test]
113// We test this particularly because we deserialize values from strings.
114#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
115fn test_replica_allocation_deserialization() {
116    use bytesize::ByteSize;
117    use mz_ore::{assert_err, assert_ok};
118
119    let data = r#"
120        {
121            "cpu_limit": 1.0,
122            "memory_limit": "10GiB",
123            "disk_limit": "100MiB",
124            "scale": 16,
125            "workers": 1,
126            "credits_per_hour": "16",
127            "swap_enabled": true,
128            "selectors": {
129                "key1": "value1",
130                "key2": "value2"
131            }
132        }"#;
133
134    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
135        .expect("deserialization from JSON succeeds for ReplicaAllocation");
136
137    assert_eq!(
138        replica_allocation,
139        ReplicaAllocation {
140            credits_per_hour: 16.into(),
141            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
142            disabled: false,
143            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
144            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
145            cpu_exclusive: false,
146            is_cc: true,
147            swap_enabled: true,
148            scale: NonZero::new(16).unwrap(),
149            workers: NonZero::new(1).unwrap(),
150            selectors: BTreeMap::from([
151                ("key1".to_string(), "value1".to_string()),
152                ("key2".to_string(), "value2".to_string())
153            ]),
154        }
155    );
156
157    let data = r#"
158        {
159            "cpu_limit": 0,
160            "memory_limit": "0GiB",
161            "disk_limit": "0MiB",
162            "scale": 1,
163            "workers": 1,
164            "credits_per_hour": "0",
165            "cpu_exclusive": true,
166            "disabled": true
167        }"#;
168
169    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
170        .expect("deserialization from JSON succeeds for ReplicaAllocation");
171
172    assert_eq!(
173        replica_allocation,
174        ReplicaAllocation {
175            credits_per_hour: 0.into(),
176            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
177            disabled: true,
178            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
179            cpu_limit: Some(CpuLimit::from_millicpus(0)),
180            cpu_exclusive: true,
181            is_cc: true,
182            swap_enabled: false,
183            scale: NonZero::new(1).unwrap(),
184            workers: NonZero::new(1).unwrap(),
185            selectors: Default::default(),
186        }
187    );
188
189    // `scale` and `workers` must be non-zero.
190    let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
191    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
192    let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
193    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
194    let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
195    assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
196}
197
198/// Configures the location of a cluster replica.
199#[derive(Clone, Debug, Serialize, PartialEq)]
200pub enum ReplicaLocation {
201    /// An unmanaged replica.
202    Unmanaged(UnmanagedReplicaLocation),
203    /// A managed replica.
204    Managed(ManagedReplicaLocation),
205}
206
207impl ReplicaLocation {
208    /// Returns the number of processes specified by this replica location.
209    pub fn num_processes(&self) -> usize {
210        match self {
211            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
212                computectl_addrs, ..
213            }) => computectl_addrs.len(),
214            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
215                allocation.scale.cast_into()
216            }
217        }
218    }
219
220    pub fn billed_as(&self) -> Option<&str> {
221        match self {
222            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
223                billed_as.as_deref()
224            }
225            _ => None,
226        }
227    }
228
229    pub fn internal(&self) -> bool {
230        match self {
231            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
232            ReplicaLocation::Unmanaged(_) => false,
233        }
234    }
235
236    /// Returns the number of workers specified by this replica location.
237    ///
238    /// `None` for unmanaged replicas, whose worker count we don't know.
239    pub fn workers(&self) -> Option<usize> {
240        match self {
241            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
242                Some(allocation.workers.get() * self.num_processes())
243            }
244            ReplicaLocation::Unmanaged(_) => None,
245        }
246    }
247
248    /// A pending replica is created as part of an alter cluster of an managed
249    /// cluster. the configuration of a pending replica will not match that of
250    /// the clusters until the alter has been finalized promoting the pending
251    /// replicas and setting this value to false.
252    pub fn pending(&self) -> bool {
253        match self {
254            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
255            _ => false,
256        }
257    }
258}
259
260/// The "role" of a cluster, which is currently used to determine the
261/// severity of alerts for problems with its replicas.
262pub enum ClusterRole {
263    /// The existence and proper functioning of the cluster's replicas is
264    /// business-critical for Materialize.
265    SystemCritical,
266    /// Assuming no bugs, the cluster's replicas should always exist and function
267    /// properly. If it doesn't, however, that is less urgent than
268    /// would be the case for a `SystemCritical` replica.
269    System,
270    /// The cluster is controlled by the user, and might go down for
271    /// reasons outside our control (e.g., OOMs).
272    User,
273}
274
275/// The location of an unmanaged replica.
276#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
277pub struct UnmanagedReplicaLocation {
278    /// The network addresses of the storagectl endpoints for each process in
279    /// the replica.
280    pub storagectl_addrs: Vec<String>,
281    /// The network addresses of the computectl endpoints for each process in
282    /// the replica.
283    pub computectl_addrs: Vec<String>,
284}
285
286/// Information about availability zone constraints for replicas.
287#[derive(Clone, Debug, PartialEq, Eq)]
288pub enum ManagedReplicaAvailabilityZones {
289    /// Specified if the `Replica` is from `MANAGED` cluster,
290    /// and specifies if there is an `AVAILABILITY ZONES`
291    /// constraint. Empty lists are represented as `None`.
292    FromCluster(Option<Vec<String>>),
293    /// Specified if the `Replica` is from a non-`MANAGED` cluster,
294    /// and specifies if there is a specific `AVAILABILITY ZONE`.
295    FromReplica(Option<String>),
296}
297
298/// The location of a managed replica.
299#[derive(Clone, Debug, Serialize, PartialEq)]
300pub struct ManagedReplicaLocation {
301    /// The resource allocation for the replica.
302    pub allocation: ReplicaAllocation,
303    /// SQL size parameter used for allocation
304    pub size: String,
305    /// If `true`, Materialize support owns this replica.
306    pub internal: bool,
307    /// Optional SQL size parameter used for billing.
308    pub billed_as: Option<String>,
309    /// The replica's availability zones, if specified.
310    ///
311    /// This is either the replica's specific `AVAILABILITY ZONE`,
312    /// or the zones placed here during replica concretization
313    /// from the `MANAGED` cluster config.
314    ///
315    /// We skip serialization (which is used for some validation
316    /// in tests) as the latter case is a "virtual" piece of information,
317    /// that exists only at runtime.
318    ///
319    /// An empty list of availability zones is concretized as `None`,
320    /// as the on-disk serialization of `MANAGED CLUSTER AVAILABILITY ZONES`
321    /// is an empty list if none are specified
322    #[serde(skip)]
323    pub availability_zones: ManagedReplicaAvailabilityZones,
324    /// Whether the replica is pending reconfiguration
325    pub pending: bool,
326}
327
328impl ManagedReplicaLocation {
329    /// Return the size which should be used to determine billing-related information.
330    pub fn size_for_billing(&self) -> &str {
331        self.billed_as.as_deref().unwrap_or(&self.size)
332    }
333}
334
335/// Configures logging for a cluster replica.
336pub type ReplicaLogging = ComputeReplicaLogging;
337
338/// Identifier of a process within a replica.
339pub type ProcessId = u64;
340
341/// An event describing a change in status of a cluster replica process.
342#[derive(Debug, Clone, Serialize)]
343pub struct ClusterEvent {
344    pub cluster_id: ClusterId,
345    pub replica_id: ReplicaId,
346    pub process_id: ProcessId,
347    pub status: ClusterStatus,
348    pub time: DateTime<Utc>,
349}
350
351impl<T> Controller<T>
352where
353    T: ComputeControllerTimestamp,
354{
355    /// Creates a cluster with the specified identifier and configuration.
356    ///
357    /// A cluster is a combination of a storage instance and a compute instance.
358    /// A cluster has zero or more replicas; each replica colocates the storage
359    /// and compute layers on the same physical resources.
360    pub fn create_cluster(
361        &mut self,
362        id: ClusterId,
363        config: ClusterConfig,
364    ) -> Result<(), anyhow::Error> {
365        self.storage
366            .create_instance(id, config.workload_class.clone());
367        self.compute
368            .create_instance(id, config.arranged_logs, config.workload_class)?;
369        Ok(())
370    }
371
372    /// Updates the workload class for a cluster.
373    pub fn update_cluster_workload_class(
374        &mut self,
375        id: ClusterId,
376        workload_class: Option<String>,
377    ) -> Result<(), anyhow::Error> {
378        self.storage
379            .update_instance_workload_class(id, workload_class.clone());
380        self.compute
381            .update_instance_workload_class(id, workload_class)?;
382        Ok(())
383    }
384
385    /// Drops the specified cluster.
386    ///
387    /// # Panics
388    ///
389    /// Panics if the cluster still has replicas.
390    pub fn drop_cluster(&mut self, id: ClusterId) {
391        self.storage.drop_instance(id);
392        self.compute.drop_instance(id);
393    }
394
395    /// Creates a replica of the specified cluster with the specified identifier
396    /// and configuration.
397    pub fn create_replica(
398        &mut self,
399        cluster_id: ClusterId,
400        replica_id: ReplicaId,
401        cluster_name: String,
402        replica_name: String,
403        role: ClusterRole,
404        config: ReplicaConfig,
405        enable_worker_core_affinity: bool,
406    ) -> Result<(), anyhow::Error> {
407        let storage_location: ClusterReplicaLocation;
408        let compute_location: ClusterReplicaLocation;
409        let metrics_task: Option<AbortOnDropHandle<()>>;
410
411        match config.location {
412            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
413                storagectl_addrs,
414                computectl_addrs,
415            }) => {
416                compute_location = ClusterReplicaLocation {
417                    ctl_addrs: computectl_addrs,
418                };
419                storage_location = ClusterReplicaLocation {
420                    ctl_addrs: storagectl_addrs,
421                };
422                metrics_task = None;
423            }
424            ReplicaLocation::Managed(m) => {
425                let (service, metrics_task_join_handle) = self.provision_replica(
426                    cluster_id,
427                    replica_id,
428                    cluster_name,
429                    replica_name,
430                    role,
431                    m,
432                    enable_worker_core_affinity,
433                )?;
434                storage_location = ClusterReplicaLocation {
435                    ctl_addrs: service.addresses("storagectl"),
436                };
437                compute_location = ClusterReplicaLocation {
438                    ctl_addrs: service.addresses("computectl"),
439                };
440                metrics_task = Some(metrics_task_join_handle);
441
442                // Register the service for HTTP proxying. TCP addresses are
443                // queried lazily via tcp_addresses() since they may not be
444                // available immediately (the process orchestrator allocates
445                // TCP proxy ports asynchronously).
446                self.replica_http_locator.register_replica(
447                    cluster_id,
448                    replica_id,
449                    Arc::from(service),
450                );
451            }
452        }
453
454        self.storage
455            .connect_replica(cluster_id, replica_id, storage_location);
456        self.compute.add_replica_to_instance(
457            cluster_id,
458            replica_id,
459            compute_location,
460            config.compute,
461        )?;
462
463        if let Some(task) = metrics_task {
464            self.metrics_tasks.insert(replica_id, task);
465        }
466
467        Ok(())
468    }
469
470    /// Drops the specified replica of the specified cluster.
471    pub fn drop_replica(
472        &mut self,
473        cluster_id: ClusterId,
474        replica_id: ReplicaId,
475    ) -> Result<(), anyhow::Error> {
476        // We unconditionally deprovision even for unmanaged replicas to avoid
477        // needing to keep track of which replicas are managed and which are
478        // unmanaged. Deprovisioning is a no-op if the replica ID was never
479        // provisioned.
480        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
481        self.metrics_tasks.remove(&replica_id);
482
483        // Remove HTTP addresses from the locator.
484        self.replica_http_locator
485            .remove_replica(cluster_id, replica_id);
486
487        self.compute.drop_replica(cluster_id, replica_id)?;
488        self.storage.drop_replica(cluster_id, replica_id);
489        Ok(())
490    }
491
492    /// Removes replicas from past generations in a background task.
493    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
494        let deploy_generation = self.deploy_generation;
495        let dyncfg = Arc::clone(self.compute.dyncfg());
496        let orchestrator = Arc::clone(&self.orchestrator);
497        task::spawn(
498            || "controller_remove_past_generation_replicas",
499            async move {
500                info!("attempting to remove past generation replicas");
501                loop {
502                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
503                        .await
504                    {
505                        Ok(()) => {
506                            info!("successfully removed past generation replicas");
507                            return;
508                        }
509                        Err(e) => {
510                            let interval =
511                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
512                                    .get(&dyncfg);
513                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
514                            time::sleep(interval).await;
515                        }
516                    }
517                }
518            },
519        );
520    }
521
522    /// Remove replicas that are orphaned in the current generation.
523    #[instrument]
524    pub async fn remove_orphaned_replicas(
525        &mut self,
526        next_user_replica_id: u64,
527        next_system_replica_id: u64,
528    ) -> Result<(), anyhow::Error> {
529        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
530
531        let actual: BTreeSet<_> = self
532            .orchestrator
533            .list_services()
534            .await?
535            .iter()
536            .map(|s| ReplicaServiceName::from_str(s))
537            .collect::<Result<_, _>>()?;
538
539        for ReplicaServiceName {
540            cluster_id,
541            replica_id,
542            generation,
543        } in actual
544        {
545            // We limit our attention here to replicas from the current deploy
546            // generation. Replicas from past generations are cleaned up during
547            // `Controller::allow_writes`.
548            if generation != self.deploy_generation {
549                continue;
550            }
551
552            let smaller_next = match replica_id {
553                ReplicaId::User(id) if id >= next_user_replica_id => {
554                    Some(ReplicaId::User(next_user_replica_id))
555                }
556                ReplicaId::System(id) if id >= next_system_replica_id => {
557                    Some(ReplicaId::System(next_system_replica_id))
558                }
559                _ => None,
560            };
561            if let Some(next) = smaller_next {
562                // Found a replica in the orchestrator with a higher replica ID
563                // than what we are aware of. This must have been created by an
564                // environmentd that's competing for control of this generation.
565                // Abort to let the other process have full control.
566                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
567            }
568            if !desired.contains(&replica_id) {
569                self.deprovision_replica(cluster_id, replica_id, generation)?;
570            }
571        }
572
573        Ok(())
574    }
575
576    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
577        let deploy_generation = self.deploy_generation;
578
579        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
580            let ReplicaServiceName {
581                cluster_id,
582                replica_id,
583                generation: replica_generation,
584                ..
585            } = event.service_id.parse()?;
586
587            let event = ClusterEvent {
588                cluster_id,
589                replica_id,
590                process_id: event.process_id,
591                status: event.status,
592                time: event.time,
593            };
594
595            Ok((event, replica_generation))
596        }
597
598        let stream = self
599            .orchestrator
600            .watch_services()
601            .map(|event| event.and_then(translate_event))
602            .filter_map(move |event| async move {
603                match event {
604                    Ok((event, replica_generation)) => {
605                        if replica_generation == deploy_generation {
606                            Some(event)
607                        } else {
608                            None
609                        }
610                    }
611                    Err(error) => {
612                        error!("service watch error: {error}");
613                        None
614                    }
615                }
616            });
617
618        Box::pin(stream)
619    }
620
621    /// Provisions a replica with the service orchestrator.
622    fn provision_replica(
623        &self,
624        cluster_id: ClusterId,
625        replica_id: ReplicaId,
626        cluster_name: String,
627        replica_name: String,
628        role: ClusterRole,
629        location: ManagedReplicaLocation,
630        enable_worker_core_affinity: bool,
631    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
632        let service_name = ReplicaServiceName {
633            cluster_id,
634            replica_id,
635            generation: self.deploy_generation,
636        }
637        .to_string();
638        let role_label = match role {
639            ClusterRole::SystemCritical => "system-critical",
640            ClusterRole::System => "system",
641            ClusterRole::User => "user",
642        };
643        let environment_id = self.connection_context().environment_id.clone();
644        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
645        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
646        let persist_pubsub_url = self.persist_pubsub_url.clone();
647        let secrets_args = self.secrets_args.to_flags();
648
649        // TODO(teskje): use the same values as for compute?
650        let storage_proto_timely_config = TimelyConfig {
651            arrangement_exert_proportionality: 1337,
652            ..Default::default()
653        };
654        let compute_proto_timely_config = TimelyConfig {
655            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
656            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
657            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
658            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
659            ..Default::default()
660        };
661
662        let mut disk_limit = location.allocation.disk_limit;
663        let memory_limit = location.allocation.memory_limit;
664        let mut memory_request = None;
665
666        if location.allocation.swap_enabled {
667            // The disk limit we specify in the service config decides whether or not the replica
668            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
669            // make sure to set the disk limit accordingly.
670            disk_limit = Some(DiskLimit::ZERO);
671
672            // We want to keep the memory request equal to the memory limit, to avoid
673            // over-provisioning and ensure replicas have predictable performance. However, to
674            // enable swap, Kubernetes currently requires that request and limit are different.
675            memory_request = memory_limit.map(|MemoryLimit(limit)| {
676                let request = ByteSize::b(limit.as_u64() - 1);
677                MemoryLimit(request)
678            });
679        }
680
681        let service = self.orchestrator.ensure_service(
682            &service_name,
683            ServiceConfig {
684                image: self.clusterd_image.clone(),
685                init_container_image: self.init_container_image.clone(),
686                args: Box::new(move |assigned| {
687                    let storage_timely_config = TimelyConfig {
688                        workers: location.allocation.workers.get(),
689                        addresses: assigned.peer_addresses("storage"),
690                        ..storage_proto_timely_config
691                    };
692                    let compute_timely_config = TimelyConfig {
693                        workers: location.allocation.workers.get(),
694                        addresses: assigned.peer_addresses("compute"),
695                        ..compute_proto_timely_config
696                    };
697
698                    let mut args = vec![
699                        format!(
700                            "--storage-controller-listen-addr={}",
701                            assigned.listen_addrs["storagectl"]
702                        ),
703                        format!(
704                            "--compute-controller-listen-addr={}",
705                            assigned.listen_addrs["computectl"]
706                        ),
707                        format!(
708                            "--internal-http-listen-addr={}",
709                            assigned.listen_addrs["internal-http"]
710                        ),
711                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
712                        format!("--opentelemetry-resource=replica_id={}", replica_id),
713                        format!("--persist-pubsub-url={}", persist_pubsub_url),
714                        format!("--environment-id={}", environment_id),
715                        format!(
716                            "--storage-timely-config={}",
717                            storage_timely_config.to_string(),
718                        ),
719                        format!(
720                            "--compute-timely-config={}",
721                            compute_timely_config.to_string(),
722                        ),
723                    ];
724                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
725                        args.push(format!(
726                            "--aws-external-id-prefix={}",
727                            aws_external_id_prefix
728                        ));
729                    }
730                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
731                        args.push(format!(
732                            "--aws-connection-role-arn={}",
733                            aws_connection_role_arn
734                        ));
735                    }
736                    if let Some(memory_limit) = location.allocation.memory_limit {
737                        args.push(format!(
738                            "--announce-memory-limit={}",
739                            memory_limit.0.as_u64()
740                        ));
741                    }
742                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
743                        args.push("--worker-core-affinity".into());
744                    }
745                    if location.allocation.is_cc {
746                        args.push("--is-cc".into());
747                    }
748
749                    // If swap is enabled, make the replica limit its own heap usage based on the
750                    // configured memory and disk limits.
751                    if location.allocation.swap_enabled
752                        && let Some(memory_limit) = location.allocation.memory_limit
753                        && let Some(disk_limit) = location.allocation.disk_limit
754                        // Currently, the way for replica sizes to request unlimited swap is to
755                        // specify a `disk_limit` of 0. Ideally we'd change this to make them
756                        // specify no disk limit instead, but for now we need to special-case here.
757                        && disk_limit != DiskLimit::ZERO
758                    {
759                        let heap_limit = memory_limit.0 + disk_limit.0;
760                        args.push(format!("--heap-limit={}", heap_limit.as_u64()));
761                    }
762
763                    args.extend(secrets_args.clone());
764                    args
765                }),
766                ports: vec![
767                    ServicePort {
768                        name: "storagectl".into(),
769                        port_hint: 2100,
770                    },
771                    // To simplify the changes to tests, the port
772                    // chosen here is _after_ the compute ones.
773                    // TODO(petrosagg): fix the numerical ordering here
774                    ServicePort {
775                        name: "storage".into(),
776                        port_hint: 2103,
777                    },
778                    ServicePort {
779                        name: "computectl".into(),
780                        port_hint: 2101,
781                    },
782                    ServicePort {
783                        name: "compute".into(),
784                        port_hint: 2102,
785                    },
786                    ServicePort {
787                        name: "internal-http".into(),
788                        port_hint: 6878,
789                    },
790                ],
791                cpu_limit: location.allocation.cpu_limit,
792                memory_limit,
793                memory_request,
794                scale: location.allocation.scale,
795                labels: BTreeMap::from([
796                    ("replica-id".into(), replica_id.to_string()),
797                    ("cluster-id".into(), cluster_id.to_string()),
798                    ("type".into(), "cluster".into()),
799                    ("replica-role".into(), role_label.into()),
800                    ("workers".into(), location.allocation.workers.to_string()),
801                    (
802                        "size".into(),
803                        location
804                            .size
805                            .to_string()
806                            .replace("=", "-")
807                            .replace(",", "_"),
808                    ),
809                ]),
810                annotations: BTreeMap::from([
811                    ("replica-name".into(), replica_name),
812                    ("cluster-name".into(), cluster_name),
813                ]),
814                availability_zones: match location.availability_zones {
815                    ManagedReplicaAvailabilityZones::FromCluster(azs) => azs,
816                    ManagedReplicaAvailabilityZones::FromReplica(az) => az.map(|z| vec![z]),
817                },
818                // This provides the orchestrator with some label selectors that
819                // are used to constraint the scheduling of replicas, based on
820                // its internal configuration.
821                other_replicas_selector: vec![
822                    LabelSelector {
823                        label_name: "cluster-id".to_string(),
824                        logic: LabelSelectionLogic::Eq {
825                            value: cluster_id.to_string(),
826                        },
827                    },
828                    // Select other replicas (but not oneself)
829                    LabelSelector {
830                        label_name: "replica-id".into(),
831                        logic: LabelSelectionLogic::NotEq {
832                            value: replica_id.to_string(),
833                        },
834                    },
835                ],
836                replicas_selector: vec![LabelSelector {
837                    label_name: "cluster-id".to_string(),
838                    // Select ALL replicas.
839                    logic: LabelSelectionLogic::Eq {
840                        value: cluster_id.to_string(),
841                    },
842                }],
843                disk_limit,
844                node_selector: location.allocation.selectors,
845            },
846        )?;
847
848        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
849            let tx = self.metrics_tx.clone();
850            let orchestrator = Arc::clone(&self.orchestrator);
851            let service_name = service_name.clone();
852            async move {
853                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
854
855                // TODO[btv] -- I tried implementing a `watch_metrics` function,
856                // similar to `watch_services`, but it crashed due to
857                // https://github.com/kube-rs/kube/issues/1092 .
858                //
859                // If `metrics-server` can be made to fill in `resourceVersion`,
860                // or if that bug is fixed, we can try that again rather than using this inelegant
861                // loop.
862                let mut interval = tokio::time::interval(METRICS_INTERVAL);
863                loop {
864                    interval.tick().await;
865                    match orchestrator.fetch_service_metrics(&service_name).await {
866                        Ok(metrics) => {
867                            let _ = tx.send((replica_id, metrics));
868                        }
869                        Err(e) => {
870                            warn!("failed to get metrics for replica {replica_id}: {e}");
871                        }
872                    }
873                }
874            }
875        });
876
877        Ok((service, metrics_task.abort_on_drop()))
878    }
879
880    /// Deprovisions a replica with the service orchestrator.
881    fn deprovision_replica(
882        &self,
883        cluster_id: ClusterId,
884        replica_id: ReplicaId,
885        generation: u64,
886    ) -> Result<(), anyhow::Error> {
887        let service_name = ReplicaServiceName {
888            cluster_id,
889            replica_id,
890            generation,
891        }
892        .to_string();
893        self.orchestrator.drop_service(&service_name)
894    }
895}
896
897/// Remove all replicas from past generations.
898async fn try_remove_past_generation_replicas(
899    orchestrator: &dyn NamespacedOrchestrator,
900    deploy_generation: u64,
901) -> Result<(), anyhow::Error> {
902    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
903
904    for service in services {
905        let name: ReplicaServiceName = service.parse()?;
906        if name.generation < deploy_generation {
907            info!(
908                cluster_id = %name.cluster_id,
909                replica_id = %name.replica_id,
910                "removing past generation replica",
911            );
912            orchestrator.drop_service(&service)?;
913        }
914    }
915
916    Ok(())
917}
918
919/// Represents the name of a cluster replica service in the orchestrator.
920#[derive(PartialEq, Eq, PartialOrd, Ord)]
921pub struct ReplicaServiceName {
922    pub cluster_id: ClusterId,
923    pub replica_id: ReplicaId,
924    pub generation: u64,
925}
926
927impl fmt::Display for ReplicaServiceName {
928    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
929        let ReplicaServiceName {
930            cluster_id,
931            replica_id,
932            generation,
933        } = self;
934        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
935    }
936}
937
938impl FromStr for ReplicaServiceName {
939    type Err = anyhow::Error;
940
941    fn from_str(s: &str) -> Result<Self, Self::Err> {
942        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
943            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
944        });
945
946        let caps = SERVICE_NAME_RE
947            .captures(s)
948            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
949
950        Ok(ReplicaServiceName {
951            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
952            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
953            // Old versions of Materialize did not include generations in
954            // replica service names. Synthesize generation 0 if absent.
955            // TODO: remove this in the next version of Materialize.
956            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
957        })
958    }
959}