mz_orchestratord/controller/materialize/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15        core::v1::{
16            Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17            PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18            ServicePort, ServiceSpec, Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28    controller::materialize::{
29        Error, matching_image_from_environmentd_image_ref,
30        tls::{create_certificate, issuer_ref_defined},
31    },
32    k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36    materialize::v1alpha1::Materialize,
37};
38use mz_ore::instrument;
39
40pub struct Resources {
41    balancerd_external_certificate: Box<Option<Certificate>>,
42    balancerd_deployment: Box<Deployment>,
43    balancerd_service: Box<Service>,
44}
45
46impl Resources {
47    pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
48        let balancerd_external_certificate =
49            Box::new(create_balancerd_external_certificate(config, mz));
50        let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
51        let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
52
53        Self {
54            balancerd_external_certificate,
55            balancerd_deployment,
56            balancerd_service,
57        }
58    }
59
60    #[instrument]
61    pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
62        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
63        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
64        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
65
66        if let Some(certificate) = &*self.balancerd_external_certificate {
67            trace!("creating new balancerd external certificate");
68            apply_resource(&certificate_api, certificate).await?;
69        }
70
71        trace!("creating new balancerd deployment");
72        apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
73
74        trace!("creating new balancerd service");
75        apply_resource(&service_api, &*self.balancerd_service).await?;
76
77        if let Some(deployment) =
78            get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
79        {
80            for condition in deployment
81                .status
82                .as_ref()
83                .and_then(|status| status.conditions.as_deref())
84                .unwrap_or(&[])
85            {
86                if condition.type_ == "Available" && condition.status == "True" {
87                    return Ok(None);
88                }
89            }
90        }
91
92        Ok(Some(Action::requeue(Duration::from_secs(1))))
93    }
94
95    #[instrument]
96    pub async fn cleanup(
97        &self,
98        client: &Client,
99        namespace: &str,
100    ) -> Result<Option<Action>, anyhow::Error> {
101        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
102        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
103        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
104
105        trace!("deleting balancerd service");
106        delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
107
108        trace!("deleting balancerd deployment");
109        delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
110
111        if let Some(certificate) = &*self.balancerd_external_certificate {
112            trace!("deleting balancerd external certificate");
113            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
114        }
115
116        Ok(None)
117    }
118}
119
120fn create_balancerd_external_certificate(
121    config: &super::MaterializeControllerArgs,
122    mz: &Materialize,
123) -> Option<Certificate> {
124    create_certificate(
125        config.default_certificate_specs.balancerd_external.clone(),
126        mz,
127        mz.spec.balancerd_external_certificate_spec.clone(),
128        mz.balancerd_external_certificate_name(),
129        mz.balancerd_external_certificate_secret_name(),
130        None,
131        CertificatePrivateKeyAlgorithm::Rsa,
132        Some(4096),
133    )
134}
135
136fn create_balancerd_deployment_object(
137    config: &super::MaterializeControllerArgs,
138    mz: &Materialize,
139) -> Deployment {
140    let security_context = if config.enable_security_context {
141        // Since we want to adhere to the most restrictive security context, all
142        // of these fields have to be set how they are.
143        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
144        Some(SecurityContext {
145            run_as_non_root: Some(true),
146            capabilities: Some(Capabilities {
147                drop: Some(vec!["ALL".to_string()]),
148                ..Default::default()
149            }),
150            seccomp_profile: Some(SeccompProfile {
151                type_: "RuntimeDefault".to_string(),
152                ..Default::default()
153            }),
154            allow_privilege_escalation: Some(false),
155            ..Default::default()
156        })
157    } else {
158        None
159    };
160
161    let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
162        Some(btreemap! {
163            "prometheus.io/scrape".to_owned() => "true".to_string(),
164            "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
165            "prometheus.io/path".to_owned() => "/metrics".to_string(),
166            "prometheus.io/scheme".to_owned() => "http".to_string(),
167        })
168    } else {
169        None
170    };
171    let mut pod_template_labels = mz.default_labels();
172    pod_template_labels.insert(
173        "materialize.cloud/name".to_owned(),
174        mz.balancerd_deployment_name(),
175    );
176    pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
177    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
178
179    let ports = vec![
180        ContainerPort {
181            container_port: config.balancerd_sql_port.into(),
182            name: Some("pgwire".into()),
183            protocol: Some("TCP".into()),
184            ..Default::default()
185        },
186        ContainerPort {
187            container_port: config.balancerd_http_port.into(),
188            name: Some("http".into()),
189            protocol: Some("TCP".into()),
190            ..Default::default()
191        },
192        ContainerPort {
193            container_port: config.balancerd_internal_http_port.into(),
194            name: Some("internal-http".into()),
195            protocol: Some("TCP".into()),
196            ..Default::default()
197        },
198    ];
199
200    let mut args = vec![
201        "service".to_string(),
202        format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
203        format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
204        format!(
205            "--internal-http-listen-addr=0.0.0.0:{}",
206            config.balancerd_internal_http_port
207        ),
208        format!(
209            "--https-resolver-template={}.{}.svc.cluster.local:{}",
210            mz.environmentd_service_name(),
211            mz.namespace(),
212            config.environmentd_http_port
213        ),
214        format!(
215            "--static-resolver-addr={}.{}.svc.cluster.local:{}",
216            mz.environmentd_service_name(),
217            mz.namespace(),
218            config.environmentd_sql_port
219        ),
220    ];
221
222    if issuer_ref_defined(
223        &config.default_certificate_specs.internal,
224        &mz.spec.internal_certificate_spec,
225    ) {
226        args.push("--internal-tls".to_owned())
227    }
228
229    let mut volumes = Vec::new();
230    let mut volume_mounts = Vec::new();
231    if issuer_ref_defined(
232        &config.default_certificate_specs.balancerd_external,
233        &mz.spec.balancerd_external_certificate_spec,
234    ) {
235        volumes.push(Volume {
236            name: "external-certificate".to_owned(),
237            secret: Some(SecretVolumeSource {
238                default_mode: Some(0o400),
239                secret_name: Some(mz.balancerd_external_certificate_secret_name()),
240                items: None,
241                optional: Some(false),
242            }),
243            ..Default::default()
244        });
245        volume_mounts.push(VolumeMount {
246            name: "external-certificate".to_owned(),
247            mount_path: "/etc/external_tls".to_owned(),
248            read_only: Some(true),
249            ..Default::default()
250        });
251        args.extend([
252            "--tls-mode=require".into(),
253            "--tls-cert=/etc/external_tls/tls.crt".into(),
254            "--tls-key=/etc/external_tls/tls.key".into(),
255        ]);
256    } else {
257        args.push("--tls-mode=disable".to_string());
258    }
259
260    let startup_probe = Probe {
261        http_get: Some(HTTPGetAction {
262            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
263            path: Some("/api/readyz".into()),
264            ..Default::default()
265        }),
266        failure_threshold: Some(20),
267        initial_delay_seconds: Some(3),
268        period_seconds: Some(3),
269        success_threshold: Some(1),
270        timeout_seconds: Some(1),
271        ..Default::default()
272    };
273    let readiness_probe = Probe {
274        http_get: Some(HTTPGetAction {
275            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
276            path: Some("/api/readyz".into()),
277            ..Default::default()
278        }),
279        failure_threshold: Some(3),
280        period_seconds: Some(10),
281        success_threshold: Some(1),
282        timeout_seconds: Some(1),
283        ..Default::default()
284    };
285    let liveness_probe = Probe {
286        http_get: Some(HTTPGetAction {
287            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
288            path: Some("/api/livez".into()),
289            ..Default::default()
290        }),
291        failure_threshold: Some(3),
292        initial_delay_seconds: Some(8),
293        period_seconds: Some(10),
294        success_threshold: Some(1),
295        timeout_seconds: Some(1),
296        ..Default::default()
297    };
298
299    let container = Container {
300        name: "balancerd".to_owned(),
301        image: Some(matching_image_from_environmentd_image_ref(
302            &mz.spec.environmentd_image_ref,
303            "balancerd",
304            None,
305        )),
306        image_pull_policy: Some(config.image_pull_policy.to_string()),
307        ports: Some(ports),
308        args: Some(args),
309        startup_probe: Some(startup_probe),
310        readiness_probe: Some(readiness_probe),
311        liveness_probe: Some(liveness_probe),
312        resources: mz
313            .spec
314            .balancerd_resource_requirements
315            .clone()
316            .or_else(|| config.balancerd_default_resources.clone()),
317        security_context: security_context.clone(),
318        volume_mounts: Some(volume_mounts),
319        ..Default::default()
320    };
321
322    let deployment_spec = DeploymentSpec {
323        replicas: Some(mz.balancerd_replicas()),
324        selector: LabelSelector {
325            match_labels: Some(pod_template_labels.clone()),
326            ..Default::default()
327        },
328        strategy: Some(DeploymentStrategy {
329            rolling_update: Some(RollingUpdateDeployment {
330                // Allow a complete set of new pods at once, to minimize the
331                // chances of a new connection going to a pod that will be
332                // immediately drained
333                max_surge: Some(IntOrString::String("100%".into())),
334                ..Default::default()
335            }),
336            ..Default::default()
337        }),
338        template: PodTemplateSpec {
339            // not using managed_resource_meta because the pod should be
340            // owned by the deployment, not the materialize instance
341            metadata: Some(ObjectMeta {
342                annotations: pod_template_annotations,
343                labels: Some(pod_template_labels),
344                ..Default::default()
345            }),
346            spec: Some(PodSpec {
347                containers: vec![container],
348                node_selector: Some(
349                    config
350                        .balancerd_node_selector
351                        .iter()
352                        .map(|selector| (selector.key.clone(), selector.value.clone()))
353                        .collect(),
354                ),
355                affinity: config.balancerd_affinity.clone(),
356                tolerations: config.balancerd_tolerations.clone(),
357                security_context: Some(PodSecurityContext {
358                    fs_group: Some(999),
359                    run_as_user: Some(999),
360                    run_as_group: Some(999),
361                    ..Default::default()
362                }),
363                scheduler_name: config.scheduler_name.clone(),
364                service_account_name: Some(mz.service_account_name()),
365                volumes: Some(volumes),
366                ..Default::default()
367            }),
368        },
369        ..Default::default()
370    };
371
372    Deployment {
373        metadata: ObjectMeta {
374            ..mz.managed_resource_meta(mz.balancerd_deployment_name())
375        },
376        spec: Some(deployment_spec),
377        status: None,
378    }
379}
380
381fn create_balancerd_service_object(
382    config: &super::MaterializeControllerArgs,
383    mz: &Materialize,
384) -> Service {
385    let selector =
386        btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
387
388    let ports = vec![
389        ServicePort {
390            name: Some("http".to_string()),
391            protocol: Some("TCP".to_string()),
392            port: config.balancerd_http_port.into(),
393            target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
394            ..Default::default()
395        },
396        ServicePort {
397            name: Some("pgwire".to_string()),
398            protocol: Some("TCP".to_string()),
399            port: config.balancerd_sql_port.into(),
400            target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
401            ..Default::default()
402        },
403    ];
404
405    let spec = ServiceSpec {
406        type_: Some("ClusterIP".to_string()),
407        cluster_ip: Some("None".to_string()),
408        selector: Some(selector),
409        ports: Some(ports),
410        ..Default::default()
411    };
412
413    Service {
414        metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
415        spec: Some(spec),
416        status: None,
417    }
418}