mz_orchestratord/controller/materialize/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15        core::v1::{
16            Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17            PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18            ServicePort, ServiceSpec, Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28    controller::materialize::{
29        matching_image_from_environmentd_image_ref,
30        tls::{create_certificate, issuer_ref_defined},
31    },
32    k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35    generated::cert_manager::certificates::Certificate, materialize::v1alpha1::Materialize,
36};
37use mz_ore::instrument;
38
39pub struct Resources {
40    balancerd_external_certificate: Box<Option<Certificate>>,
41    balancerd_deployment: Box<Deployment>,
42    balancerd_service: Box<Service>,
43}
44
45impl Resources {
46    pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
47        let balancerd_external_certificate =
48            Box::new(create_balancerd_external_certificate(config, mz));
49        let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
50        let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
51
52        Self {
53            balancerd_external_certificate,
54            balancerd_deployment,
55            balancerd_service,
56        }
57    }
58
59    #[instrument]
60    pub async fn apply(
61        &self,
62        client: &Client,
63        namespace: &str,
64    ) -> Result<Option<Action>, anyhow::Error> {
65        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
66        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
67        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
68
69        if let Some(certificate) = &*self.balancerd_external_certificate {
70            trace!("creating new balancerd external certificate");
71            apply_resource(&certificate_api, certificate).await?;
72        }
73
74        trace!("creating new balancerd deployment");
75        apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
76
77        trace!("creating new balancerd service");
78        apply_resource(&service_api, &*self.balancerd_service).await?;
79
80        if let Some(deployment) =
81            get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
82        {
83            for condition in deployment
84                .status
85                .as_ref()
86                .and_then(|status| status.conditions.as_deref())
87                .unwrap_or(&[])
88            {
89                if condition.type_ == "Available" && condition.status == "True" {
90                    return Ok(None);
91                }
92            }
93        }
94
95        Ok(Some(Action::requeue(Duration::from_secs(1))))
96    }
97
98    #[instrument]
99    pub async fn cleanup(
100        &self,
101        client: &Client,
102        namespace: &str,
103    ) -> Result<Option<Action>, anyhow::Error> {
104        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
105        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
106        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
107
108        trace!("deleting balancerd service");
109        delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
110
111        trace!("deleting balancerd deployment");
112        delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
113
114        if let Some(certificate) = &*self.balancerd_external_certificate {
115            trace!("deleting balancerd external certificate");
116            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
117        }
118
119        Ok(None)
120    }
121}
122
123fn create_balancerd_external_certificate(
124    config: &super::MaterializeControllerArgs,
125    mz: &Materialize,
126) -> Option<Certificate> {
127    create_certificate(
128        config.default_certificate_specs.balancerd_external.clone(),
129        mz,
130        mz.spec.balancerd_external_certificate_spec.clone(),
131        mz.balancerd_external_certificate_name(),
132        mz.balancerd_external_certificate_secret_name(),
133        None,
134    )
135}
136
137fn create_balancerd_deployment_object(
138    config: &super::MaterializeControllerArgs,
139    mz: &Materialize,
140) -> Deployment {
141    let security_context = if config.enable_security_context {
142        // Since we want to adhere to the most restrictive security context, all
143        // of these fields have to be set how they are.
144        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
145        Some(SecurityContext {
146            run_as_non_root: Some(true),
147            capabilities: Some(Capabilities {
148                drop: Some(vec!["ALL".to_string()]),
149                ..Default::default()
150            }),
151            seccomp_profile: Some(SeccompProfile {
152                type_: "RuntimeDefault".to_string(),
153                ..Default::default()
154            }),
155            allow_privilege_escalation: Some(false),
156            ..Default::default()
157        })
158    } else {
159        None
160    };
161
162    let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
163        Some(btreemap! {
164            "prometheus.io/scrape".to_owned() => "true".to_string(),
165            "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
166            "prometheus.io/path".to_owned() => "/metrics".to_string(),
167            "prometheus.io/scheme".to_owned() => "http".to_string(),
168        })
169    } else {
170        None
171    };
172    let mut pod_template_labels = mz.default_labels();
173    pod_template_labels.insert(
174        "materialize.cloud/name".to_owned(),
175        mz.balancerd_deployment_name(),
176    );
177    pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
178    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
179
180    let ports = vec![
181        ContainerPort {
182            container_port: config.balancerd_sql_port.into(),
183            name: Some("pgwire".into()),
184            protocol: Some("TCP".into()),
185            ..Default::default()
186        },
187        ContainerPort {
188            container_port: config.balancerd_http_port.into(),
189            name: Some("http".into()),
190            protocol: Some("TCP".into()),
191            ..Default::default()
192        },
193        ContainerPort {
194            container_port: config.balancerd_internal_http_port.into(),
195            name: Some("internal-http".into()),
196            protocol: Some("TCP".into()),
197            ..Default::default()
198        },
199    ];
200
201    let mut args = vec![
202        "service".to_string(),
203        format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
204        format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
205        format!(
206            "--internal-http-listen-addr=0.0.0.0:{}",
207            config.balancerd_internal_http_port
208        ),
209        format!(
210            "--https-resolver-template={}.{}.svc.cluster.local:{}",
211            mz.environmentd_service_name(),
212            mz.namespace(),
213            config.environmentd_http_port
214        ),
215        format!(
216            "--static-resolver-addr={}.{}.svc.cluster.local:{}",
217            mz.environmentd_service_name(),
218            mz.namespace(),
219            config.environmentd_sql_port
220        ),
221    ];
222
223    if issuer_ref_defined(
224        &config.default_certificate_specs.internal,
225        &mz.spec.internal_certificate_spec,
226    ) {
227        args.push("--internal-tls".to_owned())
228    }
229
230    let mut volumes = Vec::new();
231    let mut volume_mounts = Vec::new();
232    if issuer_ref_defined(
233        &config.default_certificate_specs.balancerd_external,
234        &mz.spec.balancerd_external_certificate_spec,
235    ) {
236        volumes.push(Volume {
237            name: "external-certificate".to_owned(),
238            secret: Some(SecretVolumeSource {
239                default_mode: Some(0o400),
240                secret_name: Some(mz.balancerd_external_certificate_secret_name()),
241                items: None,
242                optional: Some(false),
243            }),
244            ..Default::default()
245        });
246        volume_mounts.push(VolumeMount {
247            name: "external-certificate".to_owned(),
248            mount_path: "/etc/external_tls".to_owned(),
249            read_only: Some(true),
250            ..Default::default()
251        });
252        args.extend([
253            "--tls-mode=require".into(),
254            "--tls-cert=/etc/external_tls/tls.crt".into(),
255            "--tls-key=/etc/external_tls/tls.key".into(),
256        ]);
257    } else {
258        args.push("--tls-mode=disable".to_string());
259    }
260
261    let startup_probe = Probe {
262        http_get: Some(HTTPGetAction {
263            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
264            path: Some("/api/readyz".into()),
265            ..Default::default()
266        }),
267        failure_threshold: Some(20),
268        initial_delay_seconds: Some(3),
269        period_seconds: Some(3),
270        success_threshold: Some(1),
271        timeout_seconds: Some(1),
272        ..Default::default()
273    };
274    let readiness_probe = Probe {
275        http_get: Some(HTTPGetAction {
276            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
277            path: Some("/api/readyz".into()),
278            ..Default::default()
279        }),
280        failure_threshold: Some(3),
281        period_seconds: Some(10),
282        success_threshold: Some(1),
283        timeout_seconds: Some(1),
284        ..Default::default()
285    };
286    let liveness_probe = Probe {
287        http_get: Some(HTTPGetAction {
288            port: IntOrString::Int(config.balancerd_internal_http_port.into()),
289            path: Some("/api/livez".into()),
290            ..Default::default()
291        }),
292        failure_threshold: Some(3),
293        initial_delay_seconds: Some(8),
294        period_seconds: Some(10),
295        success_threshold: Some(1),
296        timeout_seconds: Some(1),
297        ..Default::default()
298    };
299
300    let container = Container {
301        name: "balancerd".to_owned(),
302        image: Some(matching_image_from_environmentd_image_ref(
303            &mz.spec.environmentd_image_ref,
304            "balancerd",
305            None,
306        )),
307        image_pull_policy: Some(config.image_pull_policy.to_string()),
308        ports: Some(ports),
309        args: Some(args),
310        startup_probe: Some(startup_probe),
311        readiness_probe: Some(readiness_probe),
312        liveness_probe: Some(liveness_probe),
313        resources: mz.spec.balancerd_resource_requirements.clone(),
314        security_context: security_context.clone(),
315        volume_mounts: Some(volume_mounts),
316        ..Default::default()
317    };
318
319    let deployment_spec = DeploymentSpec {
320        replicas: Some(1),
321        selector: LabelSelector {
322            match_labels: Some(pod_template_labels.clone()),
323            ..Default::default()
324        },
325        strategy: Some(DeploymentStrategy {
326            rolling_update: Some(RollingUpdateDeployment {
327                // Allow a complete set of new pods at once, to minimize the
328                // chances of a new connection going to a pod that will be
329                // immediately drained
330                max_surge: Some(IntOrString::String("100%".into())),
331                ..Default::default()
332            }),
333            ..Default::default()
334        }),
335        template: PodTemplateSpec {
336            // not using managed_resource_meta because the pod should be
337            // owned by the deployment, not the materialize instance
338            metadata: Some(ObjectMeta {
339                annotations: pod_template_annotations,
340                labels: Some(pod_template_labels),
341                ..Default::default()
342            }),
343            spec: Some(PodSpec {
344                containers: vec![container],
345                node_selector: Some(
346                    config
347                        .balancerd_node_selector
348                        .iter()
349                        .map(|selector| (selector.key.clone(), selector.value.clone()))
350                        .collect(),
351                ),
352                affinity: config.balancerd_affinity.clone(),
353                tolerations: config.balancerd_tolerations.clone(),
354                security_context: Some(PodSecurityContext {
355                    fs_group: Some(999),
356                    run_as_user: Some(999),
357                    run_as_group: Some(999),
358                    ..Default::default()
359                }),
360                scheduler_name: config.scheduler_name.clone(),
361                service_account_name: Some(mz.service_account_name()),
362                volumes: Some(volumes),
363                ..Default::default()
364            }),
365        },
366        ..Default::default()
367    };
368
369    Deployment {
370        metadata: ObjectMeta {
371            ..mz.managed_resource_meta(mz.balancerd_deployment_name())
372        },
373        spec: Some(deployment_spec),
374        status: None,
375    }
376}
377
378fn create_balancerd_service_object(
379    config: &super::MaterializeControllerArgs,
380    mz: &Materialize,
381) -> Service {
382    let selector =
383        btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
384
385    let ports = vec![
386        ServicePort {
387            name: Some("http".to_string()),
388            protocol: Some("TCP".to_string()),
389            port: config.balancerd_http_port.into(),
390            target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
391            ..Default::default()
392        },
393        ServicePort {
394            name: Some("pgwire".to_string()),
395            protocol: Some("TCP".to_string()),
396            port: config.balancerd_sql_port.into(),
397            target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
398            ..Default::default()
399        },
400    ];
401
402    let spec = ServiceSpec {
403        type_: Some("ClusterIP".to_string()),
404        cluster_ip: Some("None".to_string()),
405        selector: Some(selector),
406        ports: Some(ports),
407        ..Default::default()
408    };
409
410    Service {
411        metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
412        spec: Some(spec),
413        status: None,
414    }
415}