mz_orchestratord/controller/materialize/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15        core::v1::{
16            Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17            PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18            ServicePort, ServiceSpec, Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28    controller::materialize::{Error, matching_image_from_environmentd_image_ref},
29    k8s::{apply_resource, delete_resource, get_resource},
30    tls::{create_certificate, issuer_ref_defined},
31};
32use mz_cloud_resources::crd::{
33    ManagedResource,
34    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
35    materialize::v1alpha1::Materialize,
36};
37use mz_ore::instrument;
38
39pub struct Resources {
40    balancerd_external_certificate: Box<Option<Certificate>>,
41    balancerd_deployment: Box<Deployment>,
42    balancerd_service: Box<Service>,
43}
44
45impl Resources {
46    pub fn new(config: &super::Config, mz: &Materialize) -> Self {
47        let balancerd_external_certificate =
48            Box::new(create_balancerd_external_certificate(config, mz));
49        let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
50        let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
51
52        Self {
53            balancerd_external_certificate,
54            balancerd_deployment,
55            balancerd_service,
56        }
57    }
58
59    #[instrument]
60    pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
61        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
62        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
63        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
64
65        if let Some(certificate) = &*self.balancerd_external_certificate {
66            trace!("creating new balancerd external certificate");
67            apply_resource(&certificate_api, certificate).await?;
68        }
69
70        trace!("creating new balancerd deployment");
71        apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
72
73        trace!("creating new balancerd service");
74        apply_resource(&service_api, &*self.balancerd_service).await?;
75
76        if let Some(deployment) =
77            get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
78        {
79            for condition in deployment
80                .status
81                .as_ref()
82                .and_then(|status| status.conditions.as_deref())
83                .unwrap_or(&[])
84            {
85                if condition.type_ == "Available" && condition.status == "True" {
86                    return Ok(None);
87                }
88            }
89        }
90
91        Ok(Some(Action::requeue(Duration::from_secs(1))))
92    }
93
94    #[instrument]
95    pub async fn cleanup(
96        &self,
97        client: &Client,
98        namespace: &str,
99    ) -> Result<Option<Action>, anyhow::Error> {
100        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
101        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
102        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
103
104        trace!("deleting balancerd service");
105        delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
106
107        trace!("deleting balancerd deployment");
108        delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
109
110        if let Some(certificate) = &*self.balancerd_external_certificate {
111            trace!("deleting balancerd external certificate");
112            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
113        }
114
115        Ok(None)
116    }
117}
118
119fn create_balancerd_external_certificate(
120    config: &super::Config,
121    mz: &Materialize,
122) -> Option<Certificate> {
123    create_certificate(
124        config.default_certificate_specs.balancerd_external.clone(),
125        mz,
126        mz.spec.balancerd_external_certificate_spec.clone(),
127        mz.balancerd_external_certificate_name(),
128        mz.balancerd_external_certificate_secret_name(),
129        None,
130        CertificatePrivateKeyAlgorithm::Rsa,
131        Some(4096),
132    )
133}
134
135fn create_balancerd_deployment_object(config: &super::Config, mz: &Materialize) -> Deployment {
136    let security_context = if config.enable_security_context {
137        // Since we want to adhere to the most restrictive security context, all
138        // of these fields have to be set how they are.
139        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
140        Some(SecurityContext {
141            run_as_non_root: Some(true),
142            capabilities: Some(Capabilities {
143                drop: Some(vec!["ALL".to_string()]),
144                ..Default::default()
145            }),
146            seccomp_profile: Some(SeccompProfile {
147                type_: "RuntimeDefault".to_string(),
148                ..Default::default()
149            }),
150            allow_privilege_escalation: Some(false),
151            ..Default::default()
152        })
153    } else {
154        None
155    };
156
157    let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
158        Some(btreemap! {
159            "prometheus.io/scrape".to_owned() => "true".to_string(),
160            "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
161            "prometheus.io/path".to_owned() => "/metrics".to_string(),
162            "prometheus.io/scheme".to_owned() => "http".to_string(),
163        })
164    } else {
165        None
166    };
167    let mut pod_template_labels = mz.default_labels();
168    pod_template_labels.insert(
169        "materialize.cloud/name".to_owned(),
170        mz.balancerd_deployment_name(),
171    );
172    pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
173    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
174
175    let ports = vec![
176        ContainerPort {
177            container_port: config.balancerd_sql_port.into(),
178            name: Some("pgwire".into()),
179            protocol: Some("TCP".into()),
180            ..Default::default()
181        },
182        ContainerPort {
183            container_port: config.balancerd_http_port.into(),
184            name: Some("http".into()),
185            protocol: Some("TCP".into()),
186            ..Default::default()
187        },
188        ContainerPort {
189            container_port: config.balancerd_internal_http_port.into(),
190            name: Some("internal-http".into()),
191            protocol: Some("TCP".into()),
192            ..Default::default()
193        },
194    ];
195
196    let mut args = vec![
197        "service".to_string(),
198        format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
199        format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
200        format!(
201            "--internal-http-listen-addr=0.0.0.0:{}",
202            config.balancerd_internal_http_port
203        ),
204        format!(
205            "--https-resolver-template={}.{}.svc.cluster.local:{}",
206            mz.environmentd_service_name(),
207            mz.namespace(),
208            config.environmentd_http_port
209        ),
210        format!(
211            "--static-resolver-addr={}.{}.svc.cluster.local:{}",
212            mz.environmentd_service_name(),
213            mz.namespace(),
214            config.environmentd_sql_port
215        ),
216    ];
217
218    if issuer_ref_defined(
219        &config.default_certificate_specs.internal,
220        &mz.spec.internal_certificate_spec,
221    ) {
222        args.push("--internal-tls".to_owned())
223    }
224
225    let mut volumes = Vec::new();
226    let mut volume_mounts = Vec::new();
227    if issuer_ref_defined(
228        &config.default_certificate_specs.balancerd_external,
229        &mz.spec.balancerd_external_certificate_spec,
230    ) {
231        volumes.push(Volume {
232            name: "external-certificate".to_owned(),
233            secret: Some(SecretVolumeSource {
234                default_mode: Some(0o400),
235                secret_name: Some(mz.balancerd_external_certificate_secret_name()),
236                items: None,
237                optional: Some(false),
238            }),
239            ..Default::default()
240        });
241        volume_mounts.push(VolumeMount {
242            name: "external-certificate".to_owned(),
243            mount_path: "/etc/external_tls".to_owned(),
244            read_only: Some(true),
245            ..Default::default()
246        });
247        args.extend([
248            "--tls-mode=require".into(),
249            "--tls-cert=/etc/external_tls/tls.crt".into(),
250            "--tls-key=/etc/external_tls/tls.key".into(),
251        ]);
252    } else {
253        args.push("--tls-mode=disable".to_string());
254    }
255
256    let startup_probe = Probe {
257        http_get: Some(HTTPGetAction {
258            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
259            path: Some("/api/readyz".into()),
260            ..Default::default()
261        }),
262        failure_threshold: Some(20),
263        initial_delay_seconds: Some(3),
264        period_seconds: Some(3),
265        success_threshold: Some(1),
266        timeout_seconds: Some(1),
267        ..Default::default()
268    };
269    let readiness_probe = Probe {
270        http_get: Some(HTTPGetAction {
271            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
272            path: Some("/api/readyz".into()),
273            ..Default::default()
274        }),
275        failure_threshold: Some(3),
276        period_seconds: Some(10),
277        success_threshold: Some(1),
278        timeout_seconds: Some(1),
279        ..Default::default()
280    };
281    let liveness_probe = Probe {
282        http_get: Some(HTTPGetAction {
283            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
284            path: Some("/api/livez".into()),
285            ..Default::default()
286        }),
287        failure_threshold: Some(3),
288        initial_delay_seconds: Some(8),
289        period_seconds: Some(10),
290        success_threshold: Some(1),
291        timeout_seconds: Some(1),
292        ..Default::default()
293    };
294
295    let container = Container {
296        name: "balancerd".to_owned(),
297        image: Some(matching_image_from_environmentd_image_ref(
298            &mz.spec.environmentd_image_ref,
299            "balancerd",
300            None,
301        )),
302        image_pull_policy: Some(config.image_pull_policy.to_string()),
303        ports: Some(ports),
304        args: Some(args),
305        startup_probe: Some(startup_probe),
306        readiness_probe: Some(readiness_probe),
307        liveness_probe: Some(liveness_probe),
308        resources: mz
309            .spec
310            .balancerd_resource_requirements
311            .clone()
312            .or_else(|| config.balancerd_default_resources.clone()),
313        security_context: security_context.clone(),
314        volume_mounts: Some(volume_mounts),
315        ..Default::default()
316    };
317
318    let deployment_spec = DeploymentSpec {
319        replicas: Some(mz.balancerd_replicas()),
320        selector: LabelSelector {
321            match_labels: Some(pod_template_labels.clone()),
322            ..Default::default()
323        },
324        strategy: Some(DeploymentStrategy {
325            rolling_update: Some(RollingUpdateDeployment {
326                // Allow a complete set of new pods at once, to minimize the
327                // chances of a new connection going to a pod that will be
328                // immediately drained
329                max_surge: Some(IntOrString::String("100%".into())),
330                ..Default::default()
331            }),
332            ..Default::default()
333        }),
334        template: PodTemplateSpec {
335            // not using managed_resource_meta because the pod should be
336            // owned by the deployment, not the materialize instance
337            metadata: Some(ObjectMeta {
338                annotations: pod_template_annotations,
339                labels: Some(pod_template_labels),
340                ..Default::default()
341            }),
342            spec: Some(PodSpec {
343                containers: vec![container],
344                node_selector: Some(
345                    config
346                        .balancerd_node_selector
347                        .iter()
348                        .map(|selector| (selector.key.clone(), selector.value.clone()))
349                        .collect(),
350                ),
351                affinity: config.balancerd_affinity.clone(),
352                tolerations: config.balancerd_tolerations.clone(),
353                security_context: Some(PodSecurityContext {
354                    fs_group: Some(999),
355                    run_as_user: Some(999),
356                    run_as_group: Some(999),
357                    ..Default::default()
358                }),
359                scheduler_name: config.scheduler_name.clone(),
360                service_account_name: Some(mz.service_account_name()),
361                volumes: Some(volumes),
362                ..Default::default()
363            }),
364        },
365        ..Default::default()
366    };
367
368    Deployment {
369        metadata: ObjectMeta {
370            ..mz.managed_resource_meta(mz.balancerd_deployment_name())
371        },
372        spec: Some(deployment_spec),
373        status: None,
374    }
375}
376
377fn create_balancerd_service_object(config: &super::Config, mz: &Materialize) -> Service {
378    let selector =
379        btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
380
381    let ports = vec![
382        ServicePort {
383            name: Some("http".to_string()),
384            protocol: Some("TCP".to_string()),
385            port: config.balancerd_http_port.into(),
386            target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
387            ..Default::default()
388        },
389        ServicePort {
390            name: Some("pgwire".to_string()),
391            protocol: Some("TCP".to_string()),
392            port: config.balancerd_sql_port.into(),
393            target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
394            ..Default::default()
395        },
396    ];
397
398    let spec = ServiceSpec {
399        type_: Some("ClusterIP".to_string()),
400        cluster_ip: Some("None".to_string()),
401        selector: Some(selector),
402        ports: Some(ports),
403        ..Default::default()
404    };
405
406    Service {
407        metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
408        spec: Some(spec),
409        status: None,
410    }
411}