mz_orchestratord/controller/materialize/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15        core::v1::{
16            Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17            PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18            ServicePort, ServiceSpec, Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28    controller::materialize::{
29        matching_image_from_environmentd_image_ref,
30        tls::{create_certificate, issuer_ref_defined},
31    },
32    k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36    materialize::v1alpha1::Materialize,
37};
38use mz_ore::instrument;
39
40pub struct Resources {
41    balancerd_external_certificate: Box<Option<Certificate>>,
42    balancerd_deployment: Box<Deployment>,
43    balancerd_service: Box<Service>,
44}
45
46impl Resources {
47    pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
48        let balancerd_external_certificate =
49            Box::new(create_balancerd_external_certificate(config, mz));
50        let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
51        let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
52
53        Self {
54            balancerd_external_certificate,
55            balancerd_deployment,
56            balancerd_service,
57        }
58    }
59
60    #[instrument]
61    pub async fn apply(
62        &self,
63        client: &Client,
64        namespace: &str,
65    ) -> Result<Option<Action>, anyhow::Error> {
66        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
67        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
68        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
69
70        if let Some(certificate) = &*self.balancerd_external_certificate {
71            trace!("creating new balancerd external certificate");
72            apply_resource(&certificate_api, certificate).await?;
73        }
74
75        trace!("creating new balancerd deployment");
76        apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
77
78        trace!("creating new balancerd service");
79        apply_resource(&service_api, &*self.balancerd_service).await?;
80
81        if let Some(deployment) =
82            get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
83        {
84            for condition in deployment
85                .status
86                .as_ref()
87                .and_then(|status| status.conditions.as_deref())
88                .unwrap_or(&[])
89            {
90                if condition.type_ == "Available" && condition.status == "True" {
91                    return Ok(None);
92                }
93            }
94        }
95
96        Ok(Some(Action::requeue(Duration::from_secs(1))))
97    }
98
99    #[instrument]
100    pub async fn cleanup(
101        &self,
102        client: &Client,
103        namespace: &str,
104    ) -> Result<Option<Action>, anyhow::Error> {
105        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
106        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
107        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
108
109        trace!("deleting balancerd service");
110        delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
111
112        trace!("deleting balancerd deployment");
113        delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
114
115        if let Some(certificate) = &*self.balancerd_external_certificate {
116            trace!("deleting balancerd external certificate");
117            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
118        }
119
120        Ok(None)
121    }
122}
123
124fn create_balancerd_external_certificate(
125    config: &super::MaterializeControllerArgs,
126    mz: &Materialize,
127) -> Option<Certificate> {
128    create_certificate(
129        config.default_certificate_specs.balancerd_external.clone(),
130        mz,
131        mz.spec.balancerd_external_certificate_spec.clone(),
132        mz.balancerd_external_certificate_name(),
133        mz.balancerd_external_certificate_secret_name(),
134        None,
135        CertificatePrivateKeyAlgorithm::Rsa,
136        Some(4096),
137    )
138}
139
140fn create_balancerd_deployment_object(
141    config: &super::MaterializeControllerArgs,
142    mz: &Materialize,
143) -> Deployment {
144    let security_context = if config.enable_security_context {
145        // Since we want to adhere to the most restrictive security context, all
146        // of these fields have to be set how they are.
147        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
148        Some(SecurityContext {
149            run_as_non_root: Some(true),
150            capabilities: Some(Capabilities {
151                drop: Some(vec!["ALL".to_string()]),
152                ..Default::default()
153            }),
154            seccomp_profile: Some(SeccompProfile {
155                type_: "RuntimeDefault".to_string(),
156                ..Default::default()
157            }),
158            allow_privilege_escalation: Some(false),
159            ..Default::default()
160        })
161    } else {
162        None
163    };
164
165    let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
166        Some(btreemap! {
167            "prometheus.io/scrape".to_owned() => "true".to_string(),
168            "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
169            "prometheus.io/path".to_owned() => "/metrics".to_string(),
170            "prometheus.io/scheme".to_owned() => "http".to_string(),
171        })
172    } else {
173        None
174    };
175    let mut pod_template_labels = mz.default_labels();
176    pod_template_labels.insert(
177        "materialize.cloud/name".to_owned(),
178        mz.balancerd_deployment_name(),
179    );
180    pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
181    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
182
183    let ports = vec![
184        ContainerPort {
185            container_port: config.balancerd_sql_port.into(),
186            name: Some("pgwire".into()),
187            protocol: Some("TCP".into()),
188            ..Default::default()
189        },
190        ContainerPort {
191            container_port: config.balancerd_http_port.into(),
192            name: Some("http".into()),
193            protocol: Some("TCP".into()),
194            ..Default::default()
195        },
196        ContainerPort {
197            container_port: config.balancerd_internal_http_port.into(),
198            name: Some("internal-http".into()),
199            protocol: Some("TCP".into()),
200            ..Default::default()
201        },
202    ];
203
204    let mut args = vec![
205        "service".to_string(),
206        format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
207        format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
208        format!(
209            "--internal-http-listen-addr=0.0.0.0:{}",
210            config.balancerd_internal_http_port
211        ),
212        format!(
213            "--https-resolver-template={}.{}.svc.cluster.local:{}",
214            mz.environmentd_service_name(),
215            mz.namespace(),
216            config.environmentd_http_port
217        ),
218        format!(
219            "--static-resolver-addr={}.{}.svc.cluster.local:{}",
220            mz.environmentd_service_name(),
221            mz.namespace(),
222            config.environmentd_sql_port
223        ),
224    ];
225
226    if issuer_ref_defined(
227        &config.default_certificate_specs.internal,
228        &mz.spec.internal_certificate_spec,
229    ) {
230        args.push("--internal-tls".to_owned())
231    }
232
233    let mut volumes = Vec::new();
234    let mut volume_mounts = Vec::new();
235    if issuer_ref_defined(
236        &config.default_certificate_specs.balancerd_external,
237        &mz.spec.balancerd_external_certificate_spec,
238    ) {
239        volumes.push(Volume {
240            name: "external-certificate".to_owned(),
241            secret: Some(SecretVolumeSource {
242                default_mode: Some(0o400),
243                secret_name: Some(mz.balancerd_external_certificate_secret_name()),
244                items: None,
245                optional: Some(false),
246            }),
247            ..Default::default()
248        });
249        volume_mounts.push(VolumeMount {
250            name: "external-certificate".to_owned(),
251            mount_path: "/etc/external_tls".to_owned(),
252            read_only: Some(true),
253            ..Default::default()
254        });
255        args.extend([
256            "--tls-mode=require".into(),
257            "--tls-cert=/etc/external_tls/tls.crt".into(),
258            "--tls-key=/etc/external_tls/tls.key".into(),
259        ]);
260    } else {
261        args.push("--tls-mode=disable".to_string());
262    }
263
264    let startup_probe = Probe {
265        http_get: Some(HTTPGetAction {
266            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
267            path: Some("/api/readyz".into()),
268            ..Default::default()
269        }),
270        failure_threshold: Some(20),
271        initial_delay_seconds: Some(3),
272        period_seconds: Some(3),
273        success_threshold: Some(1),
274        timeout_seconds: Some(1),
275        ..Default::default()
276    };
277    let readiness_probe = Probe {
278        http_get: Some(HTTPGetAction {
279            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
280            path: Some("/api/readyz".into()),
281            ..Default::default()
282        }),
283        failure_threshold: Some(3),
284        period_seconds: Some(10),
285        success_threshold: Some(1),
286        timeout_seconds: Some(1),
287        ..Default::default()
288    };
289    let liveness_probe = Probe {
290        http_get: Some(HTTPGetAction {
291            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
292            path: Some("/api/livez".into()),
293            ..Default::default()
294        }),
295        failure_threshold: Some(3),
296        initial_delay_seconds: Some(8),
297        period_seconds: Some(10),
298        success_threshold: Some(1),
299        timeout_seconds: Some(1),
300        ..Default::default()
301    };
302
303    let container = Container {
304        name: "balancerd".to_owned(),
305        image: Some(matching_image_from_environmentd_image_ref(
306            &mz.spec.environmentd_image_ref,
307            "balancerd",
308            None,
309        )),
310        image_pull_policy: Some(config.image_pull_policy.to_string()),
311        ports: Some(ports),
312        args: Some(args),
313        startup_probe: Some(startup_probe),
314        readiness_probe: Some(readiness_probe),
315        liveness_probe: Some(liveness_probe),
316        resources: mz
317            .spec
318            .balancerd_resource_requirements
319            .clone()
320            .or_else(|| config.balancerd_default_resources.clone()),
321        security_context: security_context.clone(),
322        volume_mounts: Some(volume_mounts),
323        ..Default::default()
324    };
325
326    let deployment_spec = DeploymentSpec {
327        replicas: Some(mz.balancerd_replicas()),
328        selector: LabelSelector {
329            match_labels: Some(pod_template_labels.clone()),
330            ..Default::default()
331        },
332        strategy: Some(DeploymentStrategy {
333            rolling_update: Some(RollingUpdateDeployment {
334                // Allow a complete set of new pods at once, to minimize the
335                // chances of a new connection going to a pod that will be
336                // immediately drained
337                max_surge: Some(IntOrString::String("100%".into())),
338                ..Default::default()
339            }),
340            ..Default::default()
341        }),
342        template: PodTemplateSpec {
343            // not using managed_resource_meta because the pod should be
344            // owned by the deployment, not the materialize instance
345            metadata: Some(ObjectMeta {
346                annotations: pod_template_annotations,
347                labels: Some(pod_template_labels),
348                ..Default::default()
349            }),
350            spec: Some(PodSpec {
351                containers: vec![container],
352                node_selector: Some(
353                    config
354                        .balancerd_node_selector
355                        .iter()
356                        .map(|selector| (selector.key.clone(), selector.value.clone()))
357                        .collect(),
358                ),
359                affinity: config.balancerd_affinity.clone(),
360                tolerations: config.balancerd_tolerations.clone(),
361                security_context: Some(PodSecurityContext {
362                    fs_group: Some(999),
363                    run_as_user: Some(999),
364                    run_as_group: Some(999),
365                    ..Default::default()
366                }),
367                scheduler_name: config.scheduler_name.clone(),
368                service_account_name: Some(mz.service_account_name()),
369                volumes: Some(volumes),
370                ..Default::default()
371            }),
372        },
373        ..Default::default()
374    };
375
376    Deployment {
377        metadata: ObjectMeta {
378            ..mz.managed_resource_meta(mz.balancerd_deployment_name())
379        },
380        spec: Some(deployment_spec),
381        status: None,
382    }
383}
384
385fn create_balancerd_service_object(
386    config: &super::MaterializeControllerArgs,
387    mz: &Materialize,
388) -> Service {
389    let selector =
390        btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
391
392    let ports = vec![
393        ServicePort {
394            name: Some("http".to_string()),
395            protocol: Some("TCP".to_string()),
396            port: config.balancerd_http_port.into(),
397            target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
398            ..Default::default()
399        },
400        ServicePort {
401            name: Some("pgwire".to_string()),
402            protocol: Some("TCP".to_string()),
403            port: config.balancerd_sql_port.into(),
404            target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
405            ..Default::default()
406        },
407    ];
408
409    let spec = ServiceSpec {
410        type_: Some("ClusterIP".to_string()),
411        cluster_ip: Some("None".to_string()),
412        selector: Some(selector),
413        ports: Some(ports),
414        ..Default::default()
415    };
416
417    Service {
418        metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
419        spec: Some(spec),
420        status: None,
421    }
422}