Skip to main content

mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::bail;
11use k8s_openapi::{
12    api::{
13        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
14        core::v1::{
15            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
16            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
17            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
18            Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{
22        apis::meta::v1::{Condition, LabelSelector, Time},
23        util::intstr::IntOrString,
24    },
25    jiff::Timestamp,
26};
27use kube::{
28    Api, Client, Resource, ResourceExt,
29    api::{DeleteParams, ObjectMeta, PostParams},
30    runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
31};
32use maplit::btreemap;
33use tracing::{trace, warn};
34
35use crate::{
36    Error,
37    k8s::{apply_resource, get_resource, replace_resource},
38    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
39};
40use mz_cloud_resources::crd::{
41    ManagedResource,
42    balancer::v1alpha1::{Balancer, Routing},
43    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
44};
45use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
46use mz_ore::{cli::KeyValueArg, instrument};
47
48pub struct Config {
49    pub enable_security_context: bool,
50    pub enable_prometheus_scrape_annotations: bool,
51
52    pub image_pull_policy: KubernetesImagePullPolicy,
53    pub scheduler_name: Option<String>,
54    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
55    pub balancerd_affinity: Option<Affinity>,
56    pub balancerd_tolerations: Option<Vec<Toleration>>,
57    pub balancerd_default_resources: Option<ResourceRequirements>,
58
59    pub default_certificate_specs: DefaultCertificateSpecs,
60
61    pub environmentd_sql_port: u16,
62    pub environmentd_http_port: u16,
63    pub balancerd_sql_port: u16,
64    pub balancerd_http_port: u16,
65    pub balancerd_internal_http_port: u16,
66}
67
68pub struct Context {
69    config: Config,
70}
71
72impl Context {
73    pub fn new(config: Config) -> Self {
74        Self { config }
75    }
76
77    async fn sync_deployment_status(
78        &self,
79        client: &Client,
80        balancer: &Balancer,
81    ) -> Result<(), Error> {
82        let namespace = balancer.namespace();
83        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
84        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
85
86        let Some(deployment) = get_resource(&deployment_api, &balancer.deployment_name()).await?
87        else {
88            return Ok(());
89        };
90
91        let Some(deployment_conditions) = &deployment
92            .status
93            .as_ref()
94            .and_then(|status| status.conditions.as_ref())
95        else {
96            // if the deployment doesn't have any conditions set yet, there
97            // is nothing to sync
98            return Ok(());
99        };
100
101        let ready = deployment_conditions
102            .iter()
103            .any(|condition| condition.type_ == "Available" && condition.status == "True");
104        let ready_str = if ready { "True" } else { "False" };
105
106        let mut status = balancer.status.clone().unwrap();
107        if status
108            .conditions
109            .iter()
110            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
111        {
112            // if the deployment status is already set correctly, we don't
113            // need to set it again (this prevents us from getting stuck in
114            // a reconcile loop)
115            return Ok(());
116        }
117
118        status.conditions = vec![Condition {
119            type_: "Ready".to_string(),
120            status: ready_str.to_string(),
121            last_transition_time: Time(Timestamp::now()),
122            message: format!(
123                "balancerd deployment is{} ready",
124                if ready { "" } else { " not" }
125            ),
126            observed_generation: None,
127            reason: "DeploymentStatus".to_string(),
128        }];
129        let mut new_balancer = balancer.clone();
130        new_balancer.status = Some(status);
131
132        balancer_api
133            .replace_status(
134                &balancer.name_unchecked(),
135                &PostParams::default(),
136                &new_balancer,
137            )
138            .await?;
139
140        Ok(())
141    }
142
143    fn create_external_certificate_object(
144        &self,
145        balancer: &Balancer,
146    ) -> Result<Option<Certificate>, anyhow::Error> {
147        create_certificate(
148            self.config
149                .default_certificate_specs
150                .balancerd_external
151                .clone(),
152            balancer,
153            balancer.spec.external_certificate_spec.clone(),
154            balancer.external_certificate_name(),
155            balancer.external_certificate_secret_name(),
156            None,
157            CertificatePrivateKeyAlgorithm::Ecdsa,
158            Some(256),
159        )
160    }
161
162    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
163        let security_context = if self.config.enable_security_context {
164            // Since we want to adhere to the most restrictive security context, all
165            // of these fields have to be set how they are.
166            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
167            Some(SecurityContext {
168                run_as_non_root: Some(true),
169                capabilities: Some(Capabilities {
170                    drop: Some(vec!["ALL".to_string()]),
171                    ..Default::default()
172                }),
173                seccomp_profile: Some(SeccompProfile {
174                    type_: "RuntimeDefault".to_string(),
175                    ..Default::default()
176                }),
177                allow_privilege_escalation: Some(false),
178                ..Default::default()
179            })
180        } else {
181            None
182        };
183
184        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
185            Some(btreemap! {
186                "prometheus.io/scrape".to_owned() => "true".to_string(),
187                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
188                "prometheus.io/path".to_owned() => "/metrics".to_string(),
189                "prometheus.io/scheme".to_owned() => "http".to_string(),
190            })
191        } else {
192            None
193        };
194        let mut pod_template_labels = balancer.default_labels();
195        pod_template_labels.insert(
196            "materialize.cloud/name".to_owned(),
197            balancer.deployment_name(),
198        );
199        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
200        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
201
202        let ports = vec![
203            ContainerPort {
204                container_port: self.config.balancerd_sql_port.into(),
205                name: Some("pgwire".into()),
206                protocol: Some("TCP".into()),
207                ..Default::default()
208            },
209            ContainerPort {
210                container_port: self.config.balancerd_http_port.into(),
211                name: Some("http".into()),
212                protocol: Some("TCP".into()),
213                ..Default::default()
214            },
215            ContainerPort {
216                container_port: self.config.balancerd_internal_http_port.into(),
217                name: Some("internal-http".into()),
218                protocol: Some("TCP".into()),
219                ..Default::default()
220            },
221        ];
222
223        let mut args = vec![
224            "service".to_string(),
225            format!(
226                "--pgwire-listen-addr=0.0.0.0:{}",
227                self.config.balancerd_sql_port
228            ),
229            format!(
230                "--https-listen-addr=0.0.0.0:{}",
231                self.config.balancerd_http_port
232            ),
233            format!(
234                "--internal-http-listen-addr=0.0.0.0:{}",
235                self.config.balancerd_internal_http_port
236            ),
237        ];
238        match balancer.routing()? {
239            Routing::Static(static_routing_config) => {
240                args.extend([
241                    format!(
242                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
243                        static_routing_config.environmentd_service_name,
244                        static_routing_config.environmentd_namespace,
245                        self.config.environmentd_http_port
246                    ),
247                    format!(
248                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
249                        static_routing_config.environmentd_service_name,
250                        static_routing_config.environmentd_namespace,
251                        self.config.environmentd_sql_port
252                    ),
253                ]);
254            }
255            Routing::Frontegg(_frontegg_routing_config) => {
256                bail!("frontegg routing is not yet implemented");
257            }
258        }
259
260        if issuer_ref_defined(
261            &self.config.default_certificate_specs.internal,
262            &balancer.spec.internal_certificate_spec,
263        ) {
264            args.push("--internal-tls".to_owned())
265        }
266
267        let mut volumes = Vec::new();
268        let mut volume_mounts = Vec::new();
269        if issuer_ref_defined(
270            &self.config.default_certificate_specs.balancerd_external,
271            &balancer.spec.external_certificate_spec,
272        ) {
273            volumes.push(Volume {
274                name: "external-certificate".to_owned(),
275                secret: Some(SecretVolumeSource {
276                    default_mode: Some(0o400),
277                    secret_name: Some(balancer.external_certificate_secret_name()),
278                    items: None,
279                    optional: Some(false),
280                }),
281                ..Default::default()
282            });
283            volume_mounts.push(VolumeMount {
284                name: "external-certificate".to_owned(),
285                mount_path: "/etc/external_tls".to_owned(),
286                read_only: Some(true),
287                ..Default::default()
288            });
289            args.extend([
290                "--tls-mode=require".into(),
291                "--tls-cert=/etc/external_tls/tls.crt".into(),
292                "--tls-key=/etc/external_tls/tls.key".into(),
293            ]);
294        } else {
295            args.push("--tls-mode=disable".to_string());
296        }
297
298        let startup_probe = Probe {
299            http_get: Some(HTTPGetAction {
300                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
301                path: Some("/api/readyz".into()),
302                ..Default::default()
303            }),
304            failure_threshold: Some(20),
305            initial_delay_seconds: Some(3),
306            period_seconds: Some(3),
307            success_threshold: Some(1),
308            timeout_seconds: Some(1),
309            ..Default::default()
310        };
311        let readiness_probe = Probe {
312            http_get: Some(HTTPGetAction {
313                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
314                path: Some("/api/readyz".into()),
315                ..Default::default()
316            }),
317            failure_threshold: Some(3),
318            period_seconds: Some(10),
319            success_threshold: Some(1),
320            timeout_seconds: Some(1),
321            ..Default::default()
322        };
323        let liveness_probe = Probe {
324            http_get: Some(HTTPGetAction {
325                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
326                path: Some("/api/livez".into()),
327                ..Default::default()
328            }),
329            failure_threshold: Some(3),
330            initial_delay_seconds: Some(8),
331            period_seconds: Some(10),
332            success_threshold: Some(1),
333            timeout_seconds: Some(1),
334            ..Default::default()
335        };
336
337        let container = Container {
338            name: "balancerd".to_owned(),
339            image: Some(balancer.spec.balancerd_image_ref.clone()),
340            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
341            ports: Some(ports),
342            args: Some(args),
343            startup_probe: Some(startup_probe),
344            readiness_probe: Some(readiness_probe),
345            liveness_probe: Some(liveness_probe),
346            resources: balancer
347                .spec
348                .resource_requirements
349                .clone()
350                .or_else(|| self.config.balancerd_default_resources.clone()),
351            security_context: security_context.clone(),
352            volume_mounts: Some(volume_mounts),
353            ..Default::default()
354        };
355
356        let deployment_spec = DeploymentSpec {
357            replicas: Some(balancer.replicas()),
358            selector: LabelSelector {
359                match_labels: Some(pod_template_labels.clone()),
360                ..Default::default()
361            },
362            strategy: Some(DeploymentStrategy {
363                rolling_update: Some(RollingUpdateDeployment {
364                    // Allow a complete set of new pods at once, to minimize the
365                    // chances of a new connection going to a pod that will be
366                    // immediately drained
367                    max_surge: Some(IntOrString::String("100%".into())),
368                    ..Default::default()
369                }),
370                ..Default::default()
371            }),
372            template: PodTemplateSpec {
373                // not using managed_resource_meta because the pod should be
374                // owned by the deployment, not the materialize instance
375                metadata: Some(ObjectMeta {
376                    annotations: pod_template_annotations,
377                    labels: Some(pod_template_labels),
378                    ..Default::default()
379                }),
380                spec: Some(PodSpec {
381                    containers: vec![container],
382                    node_selector: Some(
383                        self.config
384                            .balancerd_node_selector
385                            .iter()
386                            .map(|selector| (selector.key.clone(), selector.value.clone()))
387                            .collect(),
388                    ),
389                    affinity: self.config.balancerd_affinity.clone(),
390                    tolerations: self.config.balancerd_tolerations.clone(),
391                    security_context: Some(PodSecurityContext {
392                        fs_group: Some(999),
393                        run_as_user: Some(999),
394                        run_as_group: Some(999),
395                        ..Default::default()
396                    }),
397                    scheduler_name: self.config.scheduler_name.clone(),
398                    volumes: Some(volumes),
399                    ..Default::default()
400                }),
401            },
402            ..Default::default()
403        };
404
405        Ok(Deployment {
406            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
407            spec: Some(deployment_spec),
408            status: None,
409        })
410    }
411
412    fn create_service_object(&self, balancer: &Balancer) -> Service {
413        let selector =
414            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
415
416        let ports = vec![
417            ServicePort {
418                name: Some("http".to_string()),
419                protocol: Some("TCP".to_string()),
420                port: self.config.balancerd_http_port.into(),
421                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
422                ..Default::default()
423            },
424            ServicePort {
425                name: Some("pgwire".to_string()),
426                protocol: Some("TCP".to_string()),
427                port: self.config.balancerd_sql_port.into(),
428                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
429                ..Default::default()
430            },
431        ];
432
433        let spec = ServiceSpec {
434            type_: Some("ClusterIP".to_string()),
435            cluster_ip: Some("None".to_string()),
436            selector: Some(selector),
437            ports: Some(ports),
438            ..Default::default()
439        };
440
441        Service {
442            metadata: balancer.managed_resource_meta(balancer.service_name()),
443            spec: Some(spec),
444            status: None,
445        }
446    }
447
448    // TODO: remove this once everyone is upgraded to an orchestratord
449    // version with the separate balancer operator
450    async fn fix_deployment(
451        &self,
452        deployment_api: &Api<Deployment>,
453        new_deployment: &Deployment,
454    ) -> Result<(), Error> {
455        let Some(mut existing_deployment) =
456            get_resource(deployment_api, &new_deployment.name_unchecked()).await?
457        else {
458            return Ok(());
459        };
460
461        if existing_deployment.spec.as_ref().unwrap().selector
462            == new_deployment.spec.as_ref().unwrap().selector
463        {
464            return Ok(());
465        }
466
467        warn!("found existing deployment with old label selector, fixing");
468
469        // this is sufficient because the new labels are a superset of the
470        // old labels, so the existing label selector should still be valid
471        existing_deployment
472            .spec
473            .as_mut()
474            .unwrap()
475            .template
476            .metadata
477            .as_mut()
478            .unwrap()
479            .labels = new_deployment
480            .spec
481            .as_ref()
482            .unwrap()
483            .template
484            .metadata
485            .as_ref()
486            .unwrap()
487            .labels
488            .clone();
489
490        // using await_condition is not ideal in a controller loop, but this
491        // is very temporary and will only ever happen once, so this feels
492        // simpler than trying to introduce an entire state machine here
493        replace_resource(deployment_api, &existing_deployment).await?;
494        await_condition(
495            deployment_api.clone(),
496            &existing_deployment.name_unchecked(),
497            |deployment: Option<&Deployment>| {
498                let observed_generation = deployment
499                    .and_then(|deployment| deployment.status.as_ref())
500                    .and_then(|status| status.observed_generation)
501                    .unwrap_or(0);
502                let current_generation = deployment
503                    .and_then(|deployment| deployment.meta().generation)
504                    .unwrap_or(0);
505                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
506                observed_generation == current_generation
507                    && current_generation > previous_generation
508            },
509        )
510        .await
511        .map_err(|e| anyhow::anyhow!(e))?;
512        await_condition(
513            deployment_api.clone(),
514            &existing_deployment.name_unchecked(),
515            is_deployment_completed(),
516        )
517        .await
518        .map_err(|e| anyhow::anyhow!(e))?;
519
520        // delete the deployment but leave the pods around (via
521        // DeleteParams::orphan)
522        match kube::runtime::wait::delete::delete_and_finalize(
523            deployment_api.clone(),
524            &existing_deployment.name_unchecked(),
525            &DeleteParams::orphan(),
526        )
527        .await
528        {
529            Ok(_) => {}
530            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
531                if e.code == 404 =>
532            {
533                // the resource already doesn't exist
534            }
535            Err(e) => return Err(anyhow::anyhow!(e).into()),
536        }
537
538        // now, the normal apply of the new deployment (in the main loop)
539        // will take over the existing pods from the old deployment we just
540        // deleted, since we already updated the pod labels to be the same as
541        // the new label selector
542
543        Ok(())
544    }
545}
546
547#[async_trait::async_trait]
548impl k8s_controller::Context for Context {
549    type Resource = Balancer;
550    type Error = Error;
551
552    #[instrument(fields())]
553    async fn apply(
554        &self,
555        client: Client,
556        balancer: &Self::Resource,
557    ) -> Result<Option<Action>, Self::Error> {
558        if balancer.status.is_none() {
559            let balancer_api: Api<Balancer> =
560                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
561            let mut new_balancer = balancer.clone();
562            new_balancer.status = Some(balancer.status());
563            balancer_api
564                .replace_status(
565                    &balancer.name_unchecked(),
566                    &PostParams::default(),
567                    &new_balancer,
568                )
569                .await?;
570            // Updating the status should trigger a reconciliation
571            // which will include a status this time.
572            return Ok(None);
573        }
574
575        let namespace = balancer.namespace();
576        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
577        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
578        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
579
580        if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
581            trace!("creating new balancerd external certificate");
582            apply_resource(&certificate_api, &external_certificate).await?;
583        }
584
585        let deployment = self.create_deployment_object(balancer)?;
586        self.fix_deployment(&deployment_api, &deployment).await?;
587        trace!("creating new balancerd deployment");
588        apply_resource(&deployment_api, &deployment).await?;
589
590        let service = self.create_service_object(balancer);
591        trace!("creating new balancerd service");
592        apply_resource(&service_api, &service).await?;
593
594        self.sync_deployment_status(&client, balancer).await?;
595
596        Ok(None)
597    }
598}