Skip to main content

mz_controller/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster management.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::fmt;
14use std::num::NonZero;
15use std::str::FromStr;
16use std::sync::Arc;
17use std::sync::LazyLock;
18use std::time::Duration;
19
20use anyhow::anyhow;
21use bytesize::ByteSize;
22use chrono::{DateTime, Utc};
23use futures::stream::{BoxStream, StreamExt};
24use mz_cluster_client::client::{ClusterReplicaLocation, TimelyConfig};
25use mz_compute_client::logging::LogVariant;
26use mz_compute_types::config::{ComputeReplicaConfig, ComputeReplicaLogging};
27use mz_controller_types::dyncfgs::{
28    ARRANGEMENT_EXERT_PROPORTIONALITY, CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL,
29    ENABLE_TIMELY_ZERO_COPY, ENABLE_TIMELY_ZERO_COPY_LGALLOC, TIMELY_ZERO_COPY_LIMIT,
30};
31use mz_controller_types::{ClusterId, ReplicaId};
32use mz_orchestrator::NamespacedOrchestrator;
33use mz_orchestrator::{
34    CpuLimit, DiskLimit, LabelSelectionLogic, LabelSelector, MemoryLimit, Service, ServiceConfig,
35    ServiceEvent, ServicePort,
36};
37use mz_ore::cast::CastInto;
38use mz_ore::task::{self, AbortOnDropHandle};
39use mz_ore::{halt, instrument};
40use mz_repr::GlobalId;
41use mz_repr::adt::numeric::Numeric;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use tokio::time;
45use tracing::{error, info, warn};
46
47use crate::Controller;
48
49/// Configures a cluster.
50pub struct ClusterConfig {
51    /// The logging variants to enable on the compute instance.
52    ///
53    /// Each logging variant is mapped to the identifier under which to register
54    /// the arrangement storing the log's data.
55    pub arranged_logs: BTreeMap<LogVariant, GlobalId>,
56    /// An optional arbitrary string that describes the class of the workload
57    /// this cluster is running (e.g., `production` or `staging`).
58    pub workload_class: Option<String>,
59}
60
61/// The status of a cluster.
62pub type ClusterStatus = mz_orchestrator::ServiceStatus;
63
64/// Configures a cluster replica.
65#[derive(Clone, Debug, Serialize, PartialEq)]
66pub struct ReplicaConfig {
67    /// The location of the replica.
68    pub location: ReplicaLocation,
69    /// Configuration for the compute half of the replica.
70    pub compute: ComputeReplicaConfig,
71}
72
73/// Configures the resource allocation for a cluster replica.
74#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
75pub struct ReplicaAllocation {
76    /// The memory limit for each process in the replica.
77    pub memory_limit: Option<MemoryLimit>,
78    /// The CPU limit for each process in the replica.
79    pub cpu_limit: Option<CpuLimit>,
80    /// The CPU limit for each process in the replica.
81    pub cpu_request: Option<CpuLimit>,
82    /// The disk limit for each process in the replica.
83    pub disk_limit: Option<DiskLimit>,
84    /// The number of processes in the replica.
85    pub scale: NonZero<u16>,
86    /// The number of worker threads in the replica.
87    pub workers: NonZero<usize>,
88    /// The number of credits per hour that the replica consumes.
89    #[serde(deserialize_with = "mz_repr::adt::numeric::str_serde::deserialize")]
90    pub credits_per_hour: Numeric,
91    /// Whether each process has exclusive access to its CPU cores.
92    #[serde(default)]
93    pub cpu_exclusive: bool,
94    /// Whether this size represents a modern "cc" size rather than a legacy
95    /// T-shirt size.
96    #[serde(default = "default_true")]
97    pub is_cc: bool,
98    /// The size *family* this size belongs to, e.g. the size `D.1-xsmall`
99    /// belongs to family `D` and the legacy t-shirt sizes belong to family
100    /// `legacy`. The family is the coarse axis and is *not* a prefix of the size
101    /// name in general. Used as the
102    /// `replica_size_family` attribute when evaluating replica-local scoped
103    /// feature flags (see the scoped feature flags design). When unset, the
104    /// family falls back to a value derived from [`Self::is_cc`] via
105    /// [`ReplicaAllocation::family`].
106    #[serde(default)]
107    pub family: Option<String>,
108    /// Whether instances of this type use swap as the spill-to-disk mechanism.
109    #[serde(default)]
110    pub swap_enabled: bool,
111    /// Whether instances of this type can be created.
112    #[serde(default)]
113    pub disabled: bool,
114    /// Additional node selectors.
115    #[serde(default)]
116    pub selectors: BTreeMap<String, String>,
117}
118
119impl ReplicaAllocation {
120    /// The name of the size family this allocation belongs to, used as the
121    /// `replica_size_family` attribute when evaluating replica-local scoped
122    /// feature flags.
123    ///
124    /// Falls back to a value derived from [`Self::is_cc`] when [`Self::family`]
125    /// is unset: `"cc"` for modern sizes and `"legacy"` for the legacy t-shirt
126    /// sizes. This keeps the legacy family targetable even before every size
127    /// gains an explicit `family` in the size configuration.
128    pub fn family(&self) -> &str {
129        match &self.family {
130            Some(family) => family.as_str(),
131            None if self.is_cc => "cc",
132            None => "legacy",
133        }
134    }
135}
136
137fn default_true() -> bool {
138    true
139}
140
141#[mz_ore::test]
142// We test this particularly because we deserialize values from strings.
143#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
144fn test_replica_allocation_deserialization() {
145    use bytesize::ByteSize;
146    use mz_ore::{assert_err, assert_ok};
147
148    let data = r#"
149        {
150            "cpu_limit": 1.0,
151            "memory_limit": "10GiB",
152            "disk_limit": "100MiB",
153            "scale": 16,
154            "workers": 1,
155            "credits_per_hour": "16",
156            "swap_enabled": true,
157            "selectors": {
158                "key1": "value1",
159                "key2": "value2"
160            }
161        }"#;
162
163    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
164        .expect("deserialization from JSON succeeds for ReplicaAllocation");
165
166    assert_eq!(
167        replica_allocation,
168        ReplicaAllocation {
169            credits_per_hour: 16.into(),
170            disk_limit: Some(DiskLimit(ByteSize::mib(100))),
171            disabled: false,
172            memory_limit: Some(MemoryLimit(ByteSize::gib(10))),
173            cpu_limit: Some(CpuLimit::from_millicpus(1000)),
174            cpu_request: None,
175            cpu_exclusive: false,
176            is_cc: true,
177            family: None,
178            swap_enabled: true,
179            scale: NonZero::new(16).unwrap(),
180            workers: NonZero::new(1).unwrap(),
181            selectors: BTreeMap::from([
182                ("key1".to_string(), "value1".to_string()),
183                ("key2".to_string(), "value2".to_string())
184            ]),
185        }
186    );
187
188    let data = r#"
189        {
190            "cpu_limit": 0,
191            "memory_limit": "0GiB",
192            "disk_limit": "0MiB",
193            "scale": 1,
194            "workers": 1,
195            "credits_per_hour": "0",
196            "cpu_exclusive": true,
197            "disabled": true
198        }"#;
199
200    let replica_allocation: ReplicaAllocation = serde_json::from_str(data)
201        .expect("deserialization from JSON succeeds for ReplicaAllocation");
202
203    assert_eq!(
204        replica_allocation,
205        ReplicaAllocation {
206            credits_per_hour: 0.into(),
207            disk_limit: Some(DiskLimit(ByteSize::mib(0))),
208            disabled: true,
209            memory_limit: Some(MemoryLimit(ByteSize::gib(0))),
210            cpu_limit: Some(CpuLimit::from_millicpus(0)),
211            cpu_request: None,
212            cpu_exclusive: true,
213            is_cc: true,
214            family: None,
215            swap_enabled: false,
216            scale: NonZero::new(1).unwrap(),
217            workers: NonZero::new(1).unwrap(),
218            selectors: Default::default(),
219        }
220    );
221
222    // `scale` and `workers` must be non-zero.
223    let data = r#"{"scale": 0, "workers": 1, "credits_per_hour": "0"}"#;
224    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
225    let data = r#"{"scale": 1, "workers": 0, "credits_per_hour": "0"}"#;
226    assert_err!(serde_json::from_str::<ReplicaAllocation>(data));
227    let data = r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#;
228    assert_ok!(serde_json::from_str::<ReplicaAllocation>(data));
229}
230
231#[mz_ore::test]
232#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
233fn test_replica_allocation_family() {
234    let parse = |json: &str| -> ReplicaAllocation {
235        serde_json::from_str(json).expect("deserialization from JSON succeeds")
236    };
237
238    // An explicit `family` is used verbatim.
239    assert_eq!(
240        parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "family": "D"}"#).family(),
241        "D"
242    );
243    // Without an explicit `family`, modern (`is_cc`) sizes fall back to "cc".
244    // `is_cc` defaults to true.
245    assert_eq!(
246        parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0"}"#).family(),
247        "cc"
248    );
249    // Without an explicit `family`, legacy (non-`is_cc`) sizes fall back to
250    // "legacy".
251    assert_eq!(
252        parse(r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false}"#).family(),
253        "legacy"
254    );
255    // An explicit family wins even for a legacy size.
256    assert_eq!(
257        parse(
258            r#"{"scale": 1, "workers": 1, "credits_per_hour": "0", "is_cc": false, "family": "legacy-special"}"#
259        )
260        .family(),
261        "legacy-special"
262    );
263}
264
265/// Configures the location of a cluster replica.
266#[derive(Clone, Debug, Serialize, PartialEq)]
267pub enum ReplicaLocation {
268    /// An unmanaged replica.
269    Unmanaged(UnmanagedReplicaLocation),
270    /// A managed replica.
271    Managed(ManagedReplicaLocation),
272}
273
274impl ReplicaLocation {
275    /// Returns the number of processes specified by this replica location.
276    pub fn num_processes(&self) -> usize {
277        match self {
278            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
279                computectl_addrs, ..
280            }) => computectl_addrs.len(),
281            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
282                allocation.scale.cast_into()
283            }
284        }
285    }
286
287    pub fn billed_as(&self) -> Option<&str> {
288        match self {
289            ReplicaLocation::Managed(ManagedReplicaLocation { billed_as, .. }) => {
290                billed_as.as_deref()
291            }
292            ReplicaLocation::Unmanaged(_) => None,
293        }
294    }
295
296    pub fn internal(&self) -> bool {
297        match self {
298            ReplicaLocation::Managed(ManagedReplicaLocation { internal, .. }) => *internal,
299            ReplicaLocation::Unmanaged(_) => false,
300        }
301    }
302
303    /// Returns the number of workers specified by this replica location.
304    ///
305    /// `None` for unmanaged replicas, whose worker count we don't know.
306    pub fn workers(&self) -> Option<usize> {
307        match self {
308            ReplicaLocation::Managed(ManagedReplicaLocation { allocation, .. }) => {
309                Some(allocation.workers.get() * self.num_processes())
310            }
311            ReplicaLocation::Unmanaged(_) => None,
312        }
313    }
314
315    /// A pending replica is created as part of an alter cluster of an managed
316    /// cluster. the configuration of a pending replica will not match that of
317    /// the clusters until the alter has been finalized promoting the pending
318    /// replicas and setting this value to false.
319    pub fn pending(&self) -> bool {
320        match self {
321            ReplicaLocation::Managed(ManagedReplicaLocation { pending, .. }) => *pending,
322            ReplicaLocation::Unmanaged(_) => false,
323        }
324    }
325}
326
327/// The "role" of a cluster, which is currently used to determine the
328/// severity of alerts for problems with its replicas.
329#[derive(Debug, Clone)]
330pub enum ClusterRole {
331    /// The existence and proper functioning of the cluster's replicas is
332    /// business-critical for Materialize.
333    SystemCritical,
334    /// Assuming no bugs, the cluster's replicas should always exist and function
335    /// properly. If it doesn't, however, that is less urgent than
336    /// would be the case for a `SystemCritical` replica.
337    System,
338    /// The cluster is controlled by the user, and might go down for
339    /// reasons outside our control (e.g., OOMs).
340    User,
341}
342
343/// The location of an unmanaged replica.
344#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
345pub struct UnmanagedReplicaLocation {
346    /// The network addresses of the storagectl endpoints for each process in
347    /// the replica.
348    pub storagectl_addrs: Vec<String>,
349    /// The network addresses of the computectl endpoints for each process in
350    /// the replica.
351    pub computectl_addrs: Vec<String>,
352}
353
354/// The location of a managed replica.
355#[derive(Clone, Debug, Serialize, PartialEq)]
356pub struct ManagedReplicaLocation {
357    /// The resource allocation for the replica.
358    pub allocation: ReplicaAllocation,
359    /// SQL size parameter used for allocation
360    pub size: String,
361    /// If `true`, Materialize support owns this replica.
362    pub internal: bool,
363    /// Optional SQL size parameter used for billing.
364    pub billed_as: Option<String>,
365    /// The availability zones the replica may be placed in; empty means
366    /// unconstrained.
367    ///
368    /// For a replica of a managed cluster this is the cluster's
369    /// `AVAILABILITY ZONES` pool; for a replica of an unmanaged cluster it is
370    /// the single user-pinned `AVAILABILITY ZONE`, as a zero- or one-element
371    /// list.
372    ///
373    /// Not serialized: this is re-derived from the cluster config at
374    /// concretization, not read back from a durable record.
375    #[serde(skip)]
376    pub availability_zones: Vec<String>,
377    /// Whether the replica is pending reconfiguration
378    pub pending: bool,
379}
380
381impl ManagedReplicaLocation {
382    /// Return the size which should be used to determine billing-related information.
383    pub fn size_for_billing(&self) -> &str {
384        self.billed_as.as_deref().unwrap_or(&self.size)
385    }
386}
387
388/// Configures logging for a cluster replica.
389pub type ReplicaLogging = ComputeReplicaLogging;
390
391/// Identifier of a process within a replica.
392pub type ProcessId = u64;
393
394/// An event describing a change in status of a cluster replica process.
395#[derive(Debug, Clone, Serialize)]
396pub struct ClusterEvent {
397    pub cluster_id: ClusterId,
398    pub replica_id: ReplicaId,
399    pub process_id: ProcessId,
400    pub status: ClusterStatus,
401    pub time: DateTime<Utc>,
402}
403
404impl Controller {
405    /// Creates a cluster with the specified identifier and configuration.
406    ///
407    /// A cluster is a combination of a storage instance and a compute instance.
408    /// A cluster has zero or more replicas; each replica colocates the storage
409    /// and compute layers on the same physical resources.
410    pub fn create_cluster(
411        &mut self,
412        id: ClusterId,
413        config: ClusterConfig,
414    ) -> Result<(), anyhow::Error> {
415        self.storage
416            .create_instance(id, config.workload_class.clone());
417        self.compute
418            .create_instance(id, config.arranged_logs, config.workload_class)?;
419        Ok(())
420    }
421
422    /// Updates the workload class for a cluster.
423    ///
424    /// # Panics
425    ///
426    /// Panics if the instance does not exist in the StorageController or the ComputeController.
427    pub fn update_cluster_workload_class(&mut self, id: ClusterId, workload_class: Option<String>) {
428        self.storage
429            .update_instance_workload_class(id, workload_class.clone());
430        self.compute
431            .update_instance_workload_class(id, workload_class)
432            .expect("instance exists");
433    }
434
435    /// Drops the specified cluster.
436    ///
437    /// # Panics
438    ///
439    /// Panics if the cluster still has replicas.
440    pub fn drop_cluster(&mut self, id: ClusterId) {
441        self.storage.drop_instance(id);
442        self.compute.drop_instance(id);
443    }
444
445    /// Creates a replica of the specified cluster with the specified identifier
446    /// and configuration.
447    pub fn create_replica(
448        &mut self,
449        cluster_id: ClusterId,
450        replica_id: ReplicaId,
451        cluster_name: String,
452        replica_name: String,
453        role: ClusterRole,
454        config: ReplicaConfig,
455        enable_worker_core_affinity: bool,
456        enable_storage_introspection_logs: bool,
457    ) -> Result<(), anyhow::Error> {
458        let storage_location: ClusterReplicaLocation;
459        let compute_location: ClusterReplicaLocation;
460        let metrics_task: Option<AbortOnDropHandle<()>>;
461
462        match config.location {
463            ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
464                storagectl_addrs,
465                computectl_addrs,
466            }) => {
467                compute_location = ClusterReplicaLocation {
468                    ctl_addrs: computectl_addrs,
469                };
470                storage_location = ClusterReplicaLocation {
471                    ctl_addrs: storagectl_addrs,
472                };
473                metrics_task = None;
474            }
475            ReplicaLocation::Managed(m) => {
476                let (service, metrics_task_join_handle) = self.provision_replica(
477                    cluster_id,
478                    replica_id,
479                    cluster_name,
480                    replica_name,
481                    role,
482                    m,
483                    enable_worker_core_affinity,
484                    enable_storage_introspection_logs,
485                )?;
486                storage_location = ClusterReplicaLocation {
487                    ctl_addrs: service.addresses("storagectl"),
488                };
489                compute_location = ClusterReplicaLocation {
490                    ctl_addrs: service.addresses("computectl"),
491                };
492                metrics_task = Some(metrics_task_join_handle);
493
494                // Register the replica for HTTP proxying.
495                let http_addresses = service.addresses("internal-http");
496                self.replica_http_locator
497                    .register_replica(cluster_id, replica_id, http_addresses);
498            }
499        }
500
501        self.storage
502            .connect_replica(cluster_id, replica_id, storage_location);
503        self.compute.add_replica_to_instance(
504            cluster_id,
505            replica_id,
506            compute_location,
507            config.compute,
508        )?;
509
510        if let Some(task) = metrics_task {
511            self.metrics_tasks.insert(replica_id, task);
512        }
513
514        Ok(())
515    }
516
517    /// Drops the specified replica of the specified cluster.
518    pub fn drop_replica(
519        &mut self,
520        cluster_id: ClusterId,
521        replica_id: ReplicaId,
522    ) -> Result<(), anyhow::Error> {
523        // We unconditionally deprovision even for unmanaged replicas to avoid
524        // needing to keep track of which replicas are managed and which are
525        // unmanaged. Deprovisioning is a no-op if the replica ID was never
526        // provisioned.
527        self.deprovision_replica(cluster_id, replica_id, self.deploy_generation)?;
528        self.metrics_tasks.remove(&replica_id);
529
530        // Remove HTTP addresses from the locator.
531        self.replica_http_locator
532            .remove_replica(cluster_id, replica_id);
533
534        self.compute.drop_replica(cluster_id, replica_id)?;
535        self.storage.drop_replica(cluster_id, replica_id);
536        Ok(())
537    }
538
539    /// Removes replicas from past generations in a background task.
540    pub(crate) fn remove_past_generation_replicas_in_background(&self) {
541        let deploy_generation = self.deploy_generation;
542        let dyncfg = Arc::clone(self.compute.dyncfg());
543        let orchestrator = Arc::clone(&self.orchestrator);
544        task::spawn(
545            || "controller_remove_past_generation_replicas",
546            async move {
547                info!("attempting to remove past generation replicas");
548                loop {
549                    match try_remove_past_generation_replicas(&*orchestrator, deploy_generation)
550                        .await
551                    {
552                        Ok(()) => {
553                            info!("successfully removed past generation replicas");
554                            return;
555                        }
556                        Err(e) => {
557                            let interval =
558                                CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL
559                                    .get(&dyncfg);
560                            warn!(%e, "failed to remove past generation replicas; will retry in {interval:?}");
561                            time::sleep(interval).await;
562                        }
563                    }
564                }
565            },
566        );
567    }
568
569    /// Remove replicas that are orphaned in the current generation.
570    #[instrument]
571    pub async fn remove_orphaned_replicas(
572        &mut self,
573        next_user_replica_id: u64,
574        next_system_replica_id: u64,
575    ) -> Result<(), anyhow::Error> {
576        let desired: BTreeSet<_> = self.metrics_tasks.keys().copied().collect();
577
578        let actual: BTreeSet<_> = self
579            .orchestrator
580            .list_services()
581            .await?
582            .iter()
583            .map(|s| ReplicaServiceName::from_str(s))
584            .collect::<Result<_, _>>()?;
585
586        for ReplicaServiceName {
587            cluster_id,
588            replica_id,
589            generation,
590        } in actual
591        {
592            // We limit our attention here to replicas from the current deploy
593            // generation. Replicas from past generations are cleaned up during
594            // `Controller::allow_writes`.
595            if generation != self.deploy_generation {
596                continue;
597            }
598
599            let smaller_next = match replica_id {
600                ReplicaId::User(id) if id >= next_user_replica_id => {
601                    Some(ReplicaId::User(next_user_replica_id))
602                }
603                ReplicaId::System(id) if id >= next_system_replica_id => {
604                    Some(ReplicaId::System(next_system_replica_id))
605                }
606                _ => None,
607            };
608            if let Some(next) = smaller_next {
609                // Found a replica in the orchestrator with a higher replica ID
610                // than what we are aware of. This must have been created by an
611                // environmentd that's competing for control of this generation.
612                // Abort to let the other process have full control.
613                halt!("found replica ID ({replica_id}) in orchestrator >= next ID ({next})");
614            }
615            if !desired.contains(&replica_id) {
616                self.deprovision_replica(cluster_id, replica_id, generation)?;
617            }
618        }
619
620        Ok(())
621    }
622
623    pub fn events_stream(&self) -> BoxStream<'static, ClusterEvent> {
624        let deploy_generation = self.deploy_generation;
625
626        fn translate_event(event: ServiceEvent) -> Result<(ClusterEvent, u64), anyhow::Error> {
627            let ReplicaServiceName {
628                cluster_id,
629                replica_id,
630                generation: replica_generation,
631                ..
632            } = event.service_id.parse()?;
633
634            let event = ClusterEvent {
635                cluster_id,
636                replica_id,
637                process_id: event.process_id,
638                status: event.status,
639                time: event.time,
640            };
641
642            Ok((event, replica_generation))
643        }
644
645        let stream = self
646            .orchestrator
647            .watch_services()
648            .map(|event| event.and_then(translate_event))
649            .filter_map(move |event| async move {
650                match event {
651                    Ok((event, replica_generation)) => {
652                        if replica_generation == deploy_generation {
653                            Some(event)
654                        } else {
655                            None
656                        }
657                    }
658                    Err(error) => {
659                        error!("service watch error: {error}");
660                        None
661                    }
662                }
663            });
664
665        Box::pin(stream)
666    }
667
668    /// Provisions a replica with the service orchestrator.
669    fn provision_replica(
670        &self,
671        cluster_id: ClusterId,
672        replica_id: ReplicaId,
673        cluster_name: String,
674        replica_name: String,
675        role: ClusterRole,
676        location: ManagedReplicaLocation,
677        enable_worker_core_affinity: bool,
678        enable_storage_introspection_logs: bool,
679    ) -> Result<(Box<dyn Service>, AbortOnDropHandle<()>), anyhow::Error> {
680        let service_name = ReplicaServiceName {
681            cluster_id,
682            replica_id,
683            generation: self.deploy_generation,
684        }
685        .to_string();
686        let role_label = match role {
687            ClusterRole::SystemCritical => "system-critical",
688            ClusterRole::System => "system",
689            ClusterRole::User => "user",
690        };
691        let environment_id = self.connection_context().environment_id.clone();
692        let aws_external_id_prefix = self.connection_context().aws_external_id_prefix.clone();
693        let aws_connection_role_arn = self.connection_context().aws_connection_role_arn.clone();
694        let persist_pubsub_url = self.persist_pubsub_url.clone();
695        let secrets_args = self.secrets_args.to_flags();
696
697        // TODO(teskje): use the same values as for compute?
698        let storage_proto_timely_config = TimelyConfig {
699            arrangement_exert_proportionality: 1337,
700            ..Default::default()
701        };
702        let compute_proto_timely_config = TimelyConfig {
703            arrangement_exert_proportionality: ARRANGEMENT_EXERT_PROPORTIONALITY.get(&self.dyncfg),
704            enable_zero_copy: ENABLE_TIMELY_ZERO_COPY.get(&self.dyncfg),
705            enable_zero_copy_lgalloc: ENABLE_TIMELY_ZERO_COPY_LGALLOC.get(&self.dyncfg),
706            zero_copy_limit: TIMELY_ZERO_COPY_LIMIT.get(&self.dyncfg),
707            ..Default::default()
708        };
709
710        let mut disk_limit = location.allocation.disk_limit;
711        let memory_limit = location.allocation.memory_limit;
712        let mut memory_request = None;
713
714        if location.allocation.swap_enabled {
715            // The disk limit we specify in the service config decides whether or not the replica
716            // gets a scratch disk attached. We want to avoid attaching disks to swap replicas, so
717            // make sure to set the disk limit accordingly.
718            disk_limit = Some(DiskLimit::ZERO);
719
720            // We want to keep the memory request equal to the memory limit, to avoid
721            // over-provisioning and ensure replicas have predictable performance. However, to
722            // enable swap, Kubernetes currently requires that request and limit are different.
723            memory_request = memory_limit.map(|MemoryLimit(limit)| {
724                let request = ByteSize::b(limit.as_u64() - 1);
725                MemoryLimit(request)
726            });
727        }
728
729        let service = self.orchestrator.ensure_service(
730            &service_name,
731            ServiceConfig {
732                image: self.clusterd_image.clone(),
733                init_container_image: self.init_container_image.clone(),
734                args: Box::new(move |assigned| {
735                    let storage_timely_config = TimelyConfig {
736                        workers: location.allocation.workers.get(),
737                        addresses: assigned.peer_addresses("storage"),
738                        ..storage_proto_timely_config
739                    };
740                    let compute_timely_config = TimelyConfig {
741                        workers: location.allocation.workers.get(),
742                        addresses: assigned.peer_addresses("compute"),
743                        ..compute_proto_timely_config
744                    };
745
746                    let mut args = vec![
747                        format!(
748                            "--storage-controller-listen-addr={}",
749                            assigned.listen_addrs["storagectl"]
750                        ),
751                        format!(
752                            "--compute-controller-listen-addr={}",
753                            assigned.listen_addrs["computectl"]
754                        ),
755                        format!(
756                            "--internal-http-listen-addr={}",
757                            assigned.listen_addrs["internal-http"]
758                        ),
759                        format!("--opentelemetry-resource=cluster_id={}", cluster_id),
760                        format!("--opentelemetry-resource=replica_id={}", replica_id),
761                        format!("--persist-pubsub-url={}", persist_pubsub_url),
762                        format!("--environment-id={}", environment_id),
763                        format!(
764                            "--storage-timely-config={}",
765                            storage_timely_config.to_string(),
766                        ),
767                        format!(
768                            "--compute-timely-config={}",
769                            compute_timely_config.to_string(),
770                        ),
771                    ];
772                    if let Some(aws_external_id_prefix) = &aws_external_id_prefix {
773                        args.push(format!(
774                            "--aws-external-id-prefix={}",
775                            aws_external_id_prefix
776                        ));
777                    }
778                    if let Some(aws_connection_role_arn) = &aws_connection_role_arn {
779                        args.push(format!(
780                            "--aws-connection-role-arn={}",
781                            aws_connection_role_arn
782                        ));
783                    }
784                    if let Some(memory_limit) = location.allocation.memory_limit {
785                        args.push(format!(
786                            "--announce-memory-limit={}",
787                            memory_limit.0.as_u64()
788                        ));
789                    }
790                    if location.allocation.cpu_exclusive && enable_worker_core_affinity {
791                        args.push("--worker-core-affinity".into());
792                    }
793                    if enable_storage_introspection_logs {
794                        args.push("--enable-storage-introspection-logs".into());
795                    }
796                    if location.allocation.is_cc {
797                        args.push("--is-cc".into());
798                    }
799
800                    // If swap is enabled, make the replica limit its own heap usage based on the
801                    // configured memory and disk limits.
802                    if location.allocation.swap_enabled
803                        && let Some(memory_limit) = location.allocation.memory_limit
804                        && let Some(disk_limit) = location.allocation.disk_limit
805                        // Currently, the way for replica sizes to request unlimited swap is to
806                        // specify a `disk_limit` of 0. Ideally we'd change this to make them
807                        // specify no disk limit instead, but for now we need to special-case here.
808                        && disk_limit != DiskLimit::ZERO
809                    {
810                        let heap_limit = memory_limit.0 + disk_limit.0;
811                        args.push(format!("--heap-limit={}", heap_limit.as_u64()));
812                    }
813
814                    args.extend(secrets_args.clone());
815                    args
816                }),
817                ports: vec![
818                    ServicePort {
819                        name: "storagectl".into(),
820                        port_hint: 2100,
821                    },
822                    // To simplify the changes to tests, the port
823                    // chosen here is _after_ the compute ones.
824                    // TODO(petrosagg): fix the numerical ordering here
825                    ServicePort {
826                        name: "storage".into(),
827                        port_hint: 2103,
828                    },
829                    ServicePort {
830                        name: "computectl".into(),
831                        port_hint: 2101,
832                    },
833                    ServicePort {
834                        name: "compute".into(),
835                        port_hint: 2102,
836                    },
837                    ServicePort {
838                        name: "internal-http".into(),
839                        port_hint: 6878,
840                    },
841                ],
842                cpu_limit: location.allocation.cpu_limit,
843                cpu_request: location.allocation.cpu_request,
844                memory_limit,
845                memory_request,
846                scale: location.allocation.scale,
847                labels: BTreeMap::from([
848                    ("replica-id".into(), replica_id.to_string()),
849                    ("cluster-id".into(), cluster_id.to_string()),
850                    ("generation".into(), self.deploy_generation.to_string()),
851                    ("type".into(), "cluster".into()),
852                    ("replica-role".into(), role_label.into()),
853                    ("workers".into(), location.allocation.workers.to_string()),
854                    (
855                        "size".into(),
856                        location
857                            .size
858                            .to_string()
859                            .replace("=", "-")
860                            .replace(",", "_"),
861                    ),
862                ]),
863                annotations: BTreeMap::from([
864                    (
865                        "replica-name".into(),
866                        format!("{cluster_name}.{replica_name}"),
867                    ),
868                    ("cluster-name".into(), cluster_name),
869                ]),
870                // An empty list means no AZ constraint; a non-empty one pins
871                // placement to those zones.
872                availability_zones: Some(location.availability_zones).filter(|azs| !azs.is_empty()),
873                // This provides the orchestrator with some label selectors that
874                // are used to constraint the scheduling of replicas, based on
875                // its internal configuration.
876                //
877                // Selectors include `generation` so that scheduling constraints
878                // (anti-affinity, topology spread) only consider pods of the same
879                // deploy generation. Otherwise, during a generation rollout, the
880                // new-generation pods would be constrained by the placement of
881                // old-generation pods that are about to be torn down, which can
882                // prevent the new pods from scheduling (e.g., when only one AZ
883                // has capacity but it is already occupied by an old-generation
884                // pod).
885                other_replicas_selector: vec![
886                    LabelSelector {
887                        label_name: "cluster-id".to_string(),
888                        logic: LabelSelectionLogic::Eq {
889                            value: cluster_id.to_string(),
890                        },
891                    },
892                    // Select other replicas (but not oneself)
893                    LabelSelector {
894                        label_name: "replica-id".into(),
895                        logic: LabelSelectionLogic::NotEq {
896                            value: replica_id.to_string(),
897                        },
898                    },
899                    LabelSelector {
900                        label_name: "generation".into(),
901                        logic: LabelSelectionLogic::Eq {
902                            value: self.deploy_generation.to_string(),
903                        },
904                    },
905                ],
906                replicas_selector: vec![
907                    LabelSelector {
908                        label_name: "cluster-id".to_string(),
909                        // Select ALL replicas.
910                        logic: LabelSelectionLogic::Eq {
911                            value: cluster_id.to_string(),
912                        },
913                    },
914                    LabelSelector {
915                        label_name: "generation".into(),
916                        logic: LabelSelectionLogic::Eq {
917                            value: self.deploy_generation.to_string(),
918                        },
919                    },
920                ],
921                disk_limit,
922                node_selector: location.allocation.selectors,
923            },
924        )?;
925
926        let metrics_task = mz_ore::task::spawn(|| format!("replica-metrics-{replica_id}"), {
927            let tx = self.metrics_tx.clone();
928            let orchestrator = Arc::clone(&self.orchestrator);
929            let service_name = service_name.clone();
930            async move {
931                const METRICS_INTERVAL: Duration = Duration::from_secs(60);
932
933                // TODO[btv] -- I tried implementing a `watch_metrics` function,
934                // similar to `watch_services`, but it crashed due to
935                // https://github.com/kube-rs/kube/issues/1092 .
936                //
937                // If `metrics-server` can be made to fill in `resourceVersion`,
938                // or if that bug is fixed, we can try that again rather than using this inelegant
939                // loop.
940                let mut interval = tokio::time::interval(METRICS_INTERVAL);
941                loop {
942                    interval.tick().await;
943                    match orchestrator.fetch_service_metrics(&service_name).await {
944                        Ok(metrics) => {
945                            let _ = tx.send((replica_id, metrics));
946                        }
947                        Err(e) => {
948                            warn!("failed to get metrics for replica {replica_id}: {e}");
949                        }
950                    }
951                }
952            }
953        });
954
955        Ok((service, metrics_task.abort_on_drop()))
956    }
957
958    /// Deprovisions a replica with the service orchestrator.
959    fn deprovision_replica(
960        &self,
961        cluster_id: ClusterId,
962        replica_id: ReplicaId,
963        generation: u64,
964    ) -> Result<(), anyhow::Error> {
965        let service_name = ReplicaServiceName {
966            cluster_id,
967            replica_id,
968            generation,
969        }
970        .to_string();
971        self.orchestrator.drop_service(&service_name)
972    }
973}
974
975/// Remove all replicas from past generations.
976async fn try_remove_past_generation_replicas(
977    orchestrator: &dyn NamespacedOrchestrator,
978    deploy_generation: u64,
979) -> Result<(), anyhow::Error> {
980    let services: BTreeSet<_> = orchestrator.list_services().await?.into_iter().collect();
981
982    for service in services {
983        let name: ReplicaServiceName = service.parse()?;
984        if name.generation < deploy_generation {
985            info!(
986                cluster_id = %name.cluster_id,
987                replica_id = %name.replica_id,
988                "removing past generation replica",
989            );
990            orchestrator.drop_service(&service)?;
991        }
992    }
993
994    Ok(())
995}
996
997/// Represents the name of a cluster replica service in the orchestrator.
998#[derive(PartialEq, Eq, PartialOrd, Ord)]
999pub struct ReplicaServiceName {
1000    pub cluster_id: ClusterId,
1001    pub replica_id: ReplicaId,
1002    pub generation: u64,
1003}
1004
1005impl fmt::Display for ReplicaServiceName {
1006    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1007        let ReplicaServiceName {
1008            cluster_id,
1009            replica_id,
1010            generation,
1011        } = self;
1012        write!(f, "{cluster_id}-replica-{replica_id}-gen-{generation}")
1013    }
1014}
1015
1016impl FromStr for ReplicaServiceName {
1017    type Err = anyhow::Error;
1018
1019    fn from_str(s: &str) -> Result<Self, Self::Err> {
1020        static SERVICE_NAME_RE: LazyLock<Regex> = LazyLock::new(|| {
1021            Regex::new(r"(?-u)^([us]\d+)-replica-([us]\d+)(?:-gen-(\d+))?$").unwrap()
1022        });
1023
1024        let caps = SERVICE_NAME_RE
1025            .captures(s)
1026            .ok_or_else(|| anyhow!("invalid service name: {s}"))?;
1027
1028        Ok(ReplicaServiceName {
1029            cluster_id: caps.get(1).unwrap().as_str().parse().unwrap(),
1030            replica_id: caps.get(2).unwrap().as_str().parse().unwrap(),
1031            // Old versions of Materialize did not include generations in
1032            // replica service names. Synthesize generation 0 if absent.
1033            // TODO: remove this in the next version of Materialize.
1034            generation: caps.get(3).map_or("0", |m| m.as_str()).parse().unwrap(),
1035        })
1036    }
1037}