Skip to main content

mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::bail;
11use k8s_controller::TraceMetadata;
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15        core::v1::{
16            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
17            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
18            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
19            Volume, VolumeMount,
20        },
21    },
22    apimachinery::pkg::{
23        apis::meta::v1::{Condition, LabelSelector, Time},
24        util::intstr::IntOrString,
25    },
26    jiff::Timestamp,
27};
28use kube::{
29    Api, Client, Resource, ResourceExt,
30    api::{DeleteParams, ObjectMeta, PostParams},
31    runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
32};
33use maplit::btreemap;
34use tracing::{trace, warn};
35
36use crate::{
37    Error,
38    k8s::{apply_resource, get_resource, replace_resource},
39    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
40};
41use mz_cloud_resources::crd::{
42    ManagedResource,
43    balancer::v1alpha1::{Balancer, Routing},
44    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
45};
46use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
47use mz_ore::{cli::KeyValueArg, instrument};
48
49pub struct Config {
50    pub enable_security_context: bool,
51    pub enable_prometheus_scrape_annotations: bool,
52
53    pub image_pull_policy: KubernetesImagePullPolicy,
54    pub scheduler_name: Option<String>,
55    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
56    pub balancerd_affinity: Option<Affinity>,
57    pub balancerd_tolerations: Option<Vec<Toleration>>,
58    pub balancerd_default_resources: Option<ResourceRequirements>,
59
60    pub default_certificate_specs: DefaultCertificateSpecs,
61
62    pub environmentd_sql_port: u16,
63    pub environmentd_http_port: u16,
64    pub balancerd_sql_port: u16,
65    pub balancerd_http_port: u16,
66    pub balancerd_internal_http_port: u16,
67}
68
69pub struct Context {
70    config: Config,
71}
72
73impl Context {
74    pub fn new(config: Config) -> Self {
75        Self { config }
76    }
77
78    async fn sync_deployment_status(
79        &self,
80        client: &Client,
81        balancer: &Balancer,
82    ) -> Result<(), Error> {
83        let namespace = balancer.namespace();
84        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
85        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
86
87        let Some(deployment) = get_resource(&deployment_api, &balancer.deployment_name()).await?
88        else {
89            return Ok(());
90        };
91
92        let Some(deployment_conditions) = &deployment
93            .status
94            .as_ref()
95            .and_then(|status| status.conditions.as_ref())
96        else {
97            // if the deployment doesn't have any conditions set yet, there
98            // is nothing to sync
99            return Ok(());
100        };
101
102        let ready = deployment_conditions
103            .iter()
104            .any(|condition| condition.type_ == "Available" && condition.status == "True");
105        let ready_str = if ready { "True" } else { "False" };
106
107        let mut status = balancer.status.clone().unwrap();
108        if status
109            .conditions
110            .iter()
111            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
112        {
113            // if the deployment status is already set correctly, we don't
114            // need to set it again (this prevents us from getting stuck in
115            // a reconcile loop)
116            return Ok(());
117        }
118
119        status.conditions = vec![Condition {
120            type_: "Ready".to_string(),
121            status: ready_str.to_string(),
122            last_transition_time: Time(Timestamp::now()),
123            message: format!(
124                "balancerd deployment is{} ready",
125                if ready { "" } else { " not" }
126            ),
127            observed_generation: None,
128            reason: "DeploymentStatus".to_string(),
129        }];
130        let mut new_balancer = balancer.clone();
131        new_balancer.status = Some(status);
132
133        balancer_api
134            .replace_status(
135                &balancer.name_unchecked(),
136                &PostParams::default(),
137                &new_balancer,
138            )
139            .await?;
140
141        Ok(())
142    }
143
144    fn create_external_certificate_object(
145        &self,
146        balancer: &Balancer,
147    ) -> Result<Option<Certificate>, anyhow::Error> {
148        create_certificate(
149            self.config
150                .default_certificate_specs
151                .balancerd_external
152                .clone(),
153            balancer,
154            balancer.spec.external_certificate_spec.clone(),
155            balancer.external_certificate_name(),
156            balancer.external_certificate_secret_name(),
157            None,
158            CertificatePrivateKeyAlgorithm::Ecdsa,
159            Some(256),
160        )
161    }
162
163    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
164        let security_context = if self.config.enable_security_context {
165            // Since we want to adhere to the most restrictive security context, all
166            // of these fields have to be set how they are.
167            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
168            Some(SecurityContext {
169                run_as_non_root: Some(true),
170                capabilities: Some(Capabilities {
171                    drop: Some(vec!["ALL".to_string()]),
172                    ..Default::default()
173                }),
174                seccomp_profile: Some(SeccompProfile {
175                    type_: "RuntimeDefault".to_string(),
176                    ..Default::default()
177                }),
178                allow_privilege_escalation: Some(false),
179                ..Default::default()
180            })
181        } else {
182            None
183        };
184
185        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
186            Some(btreemap! {
187                "prometheus.io/scrape".to_owned() => "true".to_string(),
188                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
189                "prometheus.io/path".to_owned() => "/metrics".to_string(),
190                "prometheus.io/scheme".to_owned() => "http".to_string(),
191            })
192        } else {
193            None
194        };
195        let mut pod_template_labels = balancer.default_labels();
196        pod_template_labels.insert(
197            "materialize.cloud/name".to_owned(),
198            balancer.deployment_name(),
199        );
200        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
201        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
202
203        let ports = vec![
204            ContainerPort {
205                container_port: self.config.balancerd_sql_port.into(),
206                name: Some("pgwire".into()),
207                protocol: Some("TCP".into()),
208                ..Default::default()
209            },
210            ContainerPort {
211                container_port: self.config.balancerd_http_port.into(),
212                name: Some("http".into()),
213                protocol: Some("TCP".into()),
214                ..Default::default()
215            },
216            ContainerPort {
217                container_port: self.config.balancerd_internal_http_port.into(),
218                name: Some("internal-http".into()),
219                protocol: Some("TCP".into()),
220                ..Default::default()
221            },
222        ];
223
224        let mut args = vec![
225            "service".to_string(),
226            format!(
227                "--pgwire-listen-addr=0.0.0.0:{}",
228                self.config.balancerd_sql_port
229            ),
230            format!(
231                "--https-listen-addr=0.0.0.0:{}",
232                self.config.balancerd_http_port
233            ),
234            format!(
235                "--internal-http-listen-addr=0.0.0.0:{}",
236                self.config.balancerd_internal_http_port
237            ),
238        ];
239        match balancer.routing()? {
240            Routing::Static(static_routing_config) => {
241                args.extend([
242                    format!(
243                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
244                        static_routing_config.environmentd_service_name,
245                        static_routing_config.environmentd_namespace,
246                        self.config.environmentd_http_port
247                    ),
248                    format!(
249                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
250                        static_routing_config.environmentd_service_name,
251                        static_routing_config.environmentd_namespace,
252                        self.config.environmentd_sql_port
253                    ),
254                ]);
255            }
256            Routing::Frontegg(_frontegg_routing_config) => {
257                bail!("frontegg routing is not yet implemented");
258            }
259        }
260
261        if issuer_ref_defined(
262            &self.config.default_certificate_specs.internal,
263            &balancer.spec.internal_certificate_spec,
264        ) {
265            args.push("--internal-tls".to_owned())
266        }
267
268        let mut volumes = Vec::new();
269        let mut volume_mounts = Vec::new();
270        if issuer_ref_defined(
271            &self.config.default_certificate_specs.balancerd_external,
272            &balancer.spec.external_certificate_spec,
273        ) {
274            volumes.push(Volume {
275                name: "external-certificate".to_owned(),
276                secret: Some(SecretVolumeSource {
277                    default_mode: Some(0o400),
278                    secret_name: Some(balancer.external_certificate_secret_name()),
279                    items: None,
280                    optional: Some(false),
281                }),
282                ..Default::default()
283            });
284            volume_mounts.push(VolumeMount {
285                name: "external-certificate".to_owned(),
286                mount_path: "/etc/external_tls".to_owned(),
287                read_only: Some(true),
288                ..Default::default()
289            });
290            args.extend([
291                "--tls-mode=require".into(),
292                "--tls-cert=/etc/external_tls/tls.crt".into(),
293                "--tls-key=/etc/external_tls/tls.key".into(),
294            ]);
295        } else {
296            args.push("--tls-mode=disable".to_string());
297        }
298
299        let startup_probe = Probe {
300            http_get: Some(HTTPGetAction {
301                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
302                path: Some("/api/readyz".into()),
303                ..Default::default()
304            }),
305            failure_threshold: Some(20),
306            initial_delay_seconds: Some(3),
307            period_seconds: Some(3),
308            success_threshold: Some(1),
309            timeout_seconds: Some(1),
310            ..Default::default()
311        };
312        let readiness_probe = Probe {
313            http_get: Some(HTTPGetAction {
314                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
315                path: Some("/api/readyz".into()),
316                ..Default::default()
317            }),
318            failure_threshold: Some(3),
319            period_seconds: Some(10),
320            success_threshold: Some(1),
321            timeout_seconds: Some(1),
322            ..Default::default()
323        };
324        let liveness_probe = Probe {
325            http_get: Some(HTTPGetAction {
326                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
327                path: Some("/api/livez".into()),
328                ..Default::default()
329            }),
330            failure_threshold: Some(3),
331            initial_delay_seconds: Some(8),
332            period_seconds: Some(10),
333            success_threshold: Some(1),
334            timeout_seconds: Some(1),
335            ..Default::default()
336        };
337
338        let container = Container {
339            name: "balancerd".to_owned(),
340            image: Some(balancer.spec.balancerd_image_ref.clone()),
341            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
342            ports: Some(ports),
343            args: Some(args),
344            startup_probe: Some(startup_probe),
345            readiness_probe: Some(readiness_probe),
346            liveness_probe: Some(liveness_probe),
347            resources: balancer
348                .spec
349                .resource_requirements
350                .clone()
351                .or_else(|| self.config.balancerd_default_resources.clone()),
352            security_context: security_context.clone(),
353            volume_mounts: Some(volume_mounts),
354            ..Default::default()
355        };
356
357        let deployment_spec = DeploymentSpec {
358            replicas: Some(balancer.replicas()),
359            selector: LabelSelector {
360                match_labels: Some(pod_template_labels.clone()),
361                ..Default::default()
362            },
363            strategy: Some(DeploymentStrategy {
364                rolling_update: Some(RollingUpdateDeployment {
365                    // Allow a complete set of new pods at once, to minimize the
366                    // chances of a new connection going to a pod that will be
367                    // immediately drained
368                    max_surge: Some(IntOrString::String("100%".into())),
369                    ..Default::default()
370                }),
371                ..Default::default()
372            }),
373            template: PodTemplateSpec {
374                // not using managed_resource_meta because the pod should be
375                // owned by the deployment, not the materialize instance
376                metadata: Some(ObjectMeta {
377                    annotations: pod_template_annotations,
378                    labels: Some(pod_template_labels),
379                    ..Default::default()
380                }),
381                spec: Some(PodSpec {
382                    containers: vec![container],
383                    node_selector: Some(
384                        self.config
385                            .balancerd_node_selector
386                            .iter()
387                            .map(|selector| (selector.key.clone(), selector.value.clone()))
388                            .collect(),
389                    ),
390                    affinity: self.config.balancerd_affinity.clone(),
391                    tolerations: self.config.balancerd_tolerations.clone(),
392                    security_context: Some(PodSecurityContext {
393                        fs_group: Some(999),
394                        run_as_user: Some(999),
395                        run_as_group: Some(999),
396                        ..Default::default()
397                    }),
398                    scheduler_name: self.config.scheduler_name.clone(),
399                    volumes: Some(volumes),
400                    ..Default::default()
401                }),
402            },
403            ..Default::default()
404        };
405
406        Ok(Deployment {
407            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
408            spec: Some(deployment_spec),
409            status: None,
410        })
411    }
412
413    fn create_service_object(&self, balancer: &Balancer) -> Service {
414        let selector =
415            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
416
417        let ports = vec![
418            ServicePort {
419                name: Some("http".to_string()),
420                protocol: Some("TCP".to_string()),
421                port: self.config.balancerd_http_port.into(),
422                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
423                ..Default::default()
424            },
425            ServicePort {
426                name: Some("pgwire".to_string()),
427                protocol: Some("TCP".to_string()),
428                port: self.config.balancerd_sql_port.into(),
429                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
430                ..Default::default()
431            },
432        ];
433
434        let spec = ServiceSpec {
435            type_: Some("ClusterIP".to_string()),
436            cluster_ip: Some("None".to_string()),
437            selector: Some(selector),
438            ports: Some(ports),
439            ..Default::default()
440        };
441
442        Service {
443            metadata: balancer.managed_resource_meta(balancer.service_name()),
444            spec: Some(spec),
445            status: None,
446        }
447    }
448
449    // TODO: remove this once everyone is upgraded to an orchestratord
450    // version with the separate balancer operator
451    async fn fix_deployment(
452        &self,
453        deployment_api: &Api<Deployment>,
454        new_deployment: &Deployment,
455    ) -> Result<(), Error> {
456        let Some(mut existing_deployment) =
457            get_resource(deployment_api, &new_deployment.name_unchecked()).await?
458        else {
459            return Ok(());
460        };
461
462        if existing_deployment.spec.as_ref().unwrap().selector
463            == new_deployment.spec.as_ref().unwrap().selector
464        {
465            return Ok(());
466        }
467
468        warn!("found existing deployment with old label selector, fixing");
469
470        // this is sufficient because the new labels are a superset of the
471        // old labels, so the existing label selector should still be valid
472        existing_deployment
473            .spec
474            .as_mut()
475            .unwrap()
476            .template
477            .metadata
478            .as_mut()
479            .unwrap()
480            .labels = new_deployment
481            .spec
482            .as_ref()
483            .unwrap()
484            .template
485            .metadata
486            .as_ref()
487            .unwrap()
488            .labels
489            .clone();
490
491        // using await_condition is not ideal in a controller loop, but this
492        // is very temporary and will only ever happen once, so this feels
493        // simpler than trying to introduce an entire state machine here
494        replace_resource(deployment_api, &existing_deployment).await?;
495        await_condition(
496            deployment_api.clone(),
497            &existing_deployment.name_unchecked(),
498            |deployment: Option<&Deployment>| {
499                let observed_generation = deployment
500                    .and_then(|deployment| deployment.status.as_ref())
501                    .and_then(|status| status.observed_generation)
502                    .unwrap_or(0);
503                let current_generation = deployment
504                    .and_then(|deployment| deployment.meta().generation)
505                    .unwrap_or(0);
506                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
507                observed_generation == current_generation
508                    && current_generation > previous_generation
509            },
510        )
511        .await
512        .map_err(|e| anyhow::anyhow!(e))?;
513        await_condition(
514            deployment_api.clone(),
515            &existing_deployment.name_unchecked(),
516            is_deployment_completed(),
517        )
518        .await
519        .map_err(|e| anyhow::anyhow!(e))?;
520
521        // delete the deployment but leave the pods around (via
522        // DeleteParams::orphan)
523        match kube::runtime::wait::delete::delete_and_finalize(
524            deployment_api.clone(),
525            &existing_deployment.name_unchecked(),
526            &DeleteParams::orphan(),
527        )
528        .await
529        {
530            Ok(_) => {}
531            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
532                if e.code == 404 =>
533            {
534                // the resource already doesn't exist
535            }
536            Err(e) => return Err(anyhow::anyhow!(e).into()),
537        }
538
539        // now, the normal apply of the new deployment (in the main loop)
540        // will take over the existing pods from the old deployment we just
541        // deleted, since we already updated the pod labels to be the same as
542        // the new label selector
543
544        Ok(())
545    }
546}
547
548#[async_trait::async_trait]
549impl k8s_controller::Context for Context {
550    type Resource = Balancer;
551    type Error = Error;
552
553    #[instrument(fields())]
554    async fn apply(
555        &self,
556        client: Client,
557        balancer: &Self::Resource,
558        _metadata: &mut TraceMetadata,
559    ) -> Result<Option<Action>, Self::Error> {
560        if balancer.status.is_none() {
561            let balancer_api: Api<Balancer> =
562                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
563            let mut new_balancer = balancer.clone();
564            new_balancer.status = Some(balancer.status());
565            balancer_api
566                .replace_status(
567                    &balancer.name_unchecked(),
568                    &PostParams::default(),
569                    &new_balancer,
570                )
571                .await?;
572            // Updating the status should trigger a reconciliation
573            // which will include a status this time.
574            return Ok(None);
575        }
576
577        let namespace = balancer.namespace();
578        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
579        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
580        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
581
582        if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
583            trace!("creating new balancerd external certificate");
584            apply_resource(&certificate_api, &external_certificate).await?;
585        }
586
587        let deployment = self.create_deployment_object(balancer)?;
588        self.fix_deployment(&deployment_api, &deployment).await?;
589        trace!("creating new balancerd deployment");
590        apply_resource(&deployment_api, &deployment).await?;
591
592        let service = self.create_service_object(balancer);
593        trace!("creating new balancerd service");
594        apply_resource(&service_api, &service).await?;
595
596        self.sync_deployment_status(&client, balancer).await?;
597
598        Ok(None)
599    }
600}