Skip to main content

mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14    api::{
15        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16        core::v1::{
17            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20            Volume, VolumeMount,
21        },
22    },
23    apimachinery::pkg::{
24        apis::meta::v1::{Condition, LabelSelector, Time},
25        util::intstr::IntOrString,
26    },
27    jiff::Timestamp,
28};
29use kube::{
30    Api, Client, Resource, ResourceExt,
31    api::{DeleteParams, ObjectMeta, PostParams},
32    runtime::{
33        conditions::is_deployment_completed,
34        controller::Action,
35        reflector::{ObjectRef, Store},
36        wait::await_condition,
37    },
38};
39use maplit::btreemap;
40use tracing::{trace, warn};
41
42use crate::{
43    Error,
44    k8s::{apply_resource, make_reflector, replace_resource},
45    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
46};
47use mz_cloud_resources::crd::{
48    ManagedResource,
49    balancer::v1alpha1::{Balancer, Routing},
50    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
51};
52use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
53use mz_ore::{cli::KeyValueArg, instrument};
54
55pub struct Config {
56    pub enable_security_context: bool,
57    pub enable_prometheus_scrape_annotations: bool,
58
59    pub image_pull_policy: KubernetesImagePullPolicy,
60    pub scheduler_name: Option<String>,
61    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
62    pub balancerd_affinity: Option<Affinity>,
63    pub balancerd_tolerations: Option<Vec<Toleration>>,
64    pub balancerd_default_resources: Option<ResourceRequirements>,
65
66    pub default_certificate_specs: DefaultCertificateSpecs,
67
68    pub environmentd_sql_port: u16,
69    pub environmentd_http_port: u16,
70    pub balancerd_sql_port: u16,
71    pub balancerd_http_port: u16,
72    pub balancerd_internal_http_port: u16,
73}
74
75pub struct Context {
76    config: Config,
77    deployments: Store<Deployment>,
78}
79
80impl Context {
81    pub async fn new(config: Config, client: Client) -> Self {
82        Self {
83            config,
84            deployments: make_reflector(client).await,
85        }
86    }
87
88    async fn sync_deployment_status(
89        &self,
90        client: &Client,
91        balancer: &Balancer,
92    ) -> Result<(), kube::Error> {
93        let namespace = balancer.namespace();
94        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
95
96        let Some(deployment) = self
97            .deployments
98            .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
99        else {
100            return Ok(());
101        };
102
103        let Some(deployment_conditions) = &deployment
104            .status
105            .as_ref()
106            .and_then(|status| status.conditions.as_ref())
107        else {
108            // if the deployment doesn't have any conditions set yet, there
109            // is nothing to sync
110            return Ok(());
111        };
112
113        let ready = deployment_conditions
114            .iter()
115            .any(|condition| condition.type_ == "Available" && condition.status == "True");
116        let ready_str = if ready { "True" } else { "False" };
117
118        let mut status = balancer.status.clone().unwrap();
119        if status
120            .conditions
121            .iter()
122            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
123        {
124            // if the deployment status is already set correctly, we don't
125            // need to set it again (this prevents us from getting stuck in
126            // a reconcile loop)
127            return Ok(());
128        }
129
130        status.conditions = vec![Condition {
131            type_: "Ready".to_string(),
132            status: ready_str.to_string(),
133            last_transition_time: Time(Timestamp::now()),
134            message: format!(
135                "balancerd deployment is{} ready",
136                if ready { "" } else { " not" }
137            ),
138            observed_generation: None,
139            reason: "DeploymentStatus".to_string(),
140        }];
141        let mut new_balancer = balancer.clone();
142        new_balancer.status = Some(status);
143
144        balancer_api
145            .replace_status(
146                &balancer.name_unchecked(),
147                &PostParams::default(),
148                &new_balancer,
149            )
150            .await?;
151
152        Ok(())
153    }
154
155    fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
156        create_certificate(
157            self.config
158                .default_certificate_specs
159                .balancerd_external
160                .clone(),
161            balancer,
162            balancer.spec.external_certificate_spec.clone(),
163            balancer.external_certificate_name(),
164            balancer.external_certificate_secret_name(),
165            None,
166            CertificatePrivateKeyAlgorithm::Rsa,
167            Some(4096),
168        )
169    }
170
171    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
172        let security_context = if self.config.enable_security_context {
173            // Since we want to adhere to the most restrictive security context, all
174            // of these fields have to be set how they are.
175            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
176            Some(SecurityContext {
177                run_as_non_root: Some(true),
178                capabilities: Some(Capabilities {
179                    drop: Some(vec!["ALL".to_string()]),
180                    ..Default::default()
181                }),
182                seccomp_profile: Some(SeccompProfile {
183                    type_: "RuntimeDefault".to_string(),
184                    ..Default::default()
185                }),
186                allow_privilege_escalation: Some(false),
187                ..Default::default()
188            })
189        } else {
190            None
191        };
192
193        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
194            Some(btreemap! {
195                "prometheus.io/scrape".to_owned() => "true".to_string(),
196                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
197                "prometheus.io/path".to_owned() => "/metrics".to_string(),
198                "prometheus.io/scheme".to_owned() => "http".to_string(),
199            })
200        } else {
201            None
202        };
203        let mut pod_template_labels = balancer.default_labels();
204        pod_template_labels.insert(
205            "materialize.cloud/name".to_owned(),
206            balancer.deployment_name(),
207        );
208        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
209        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
210
211        let ports = vec![
212            ContainerPort {
213                container_port: self.config.balancerd_sql_port.into(),
214                name: Some("pgwire".into()),
215                protocol: Some("TCP".into()),
216                ..Default::default()
217            },
218            ContainerPort {
219                container_port: self.config.balancerd_http_port.into(),
220                name: Some("http".into()),
221                protocol: Some("TCP".into()),
222                ..Default::default()
223            },
224            ContainerPort {
225                container_port: self.config.balancerd_internal_http_port.into(),
226                name: Some("internal-http".into()),
227                protocol: Some("TCP".into()),
228                ..Default::default()
229            },
230        ];
231
232        let mut args = vec![
233            "service".to_string(),
234            format!(
235                "--pgwire-listen-addr=0.0.0.0:{}",
236                self.config.balancerd_sql_port
237            ),
238            format!(
239                "--https-listen-addr=0.0.0.0:{}",
240                self.config.balancerd_http_port
241            ),
242            format!(
243                "--internal-http-listen-addr=0.0.0.0:{}",
244                self.config.balancerd_internal_http_port
245            ),
246        ];
247        match balancer.routing()? {
248            Routing::Static(static_routing_config) => {
249                args.extend([
250                    format!(
251                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
252                        static_routing_config.environmentd_service_name,
253                        static_routing_config.environmentd_namespace,
254                        self.config.environmentd_http_port
255                    ),
256                    format!(
257                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
258                        static_routing_config.environmentd_service_name,
259                        static_routing_config.environmentd_namespace,
260                        self.config.environmentd_sql_port
261                    ),
262                ]);
263            }
264            Routing::Frontegg(_frontegg_routing_config) => {
265                bail!("frontegg routing is not yet implemented");
266            }
267        }
268
269        if issuer_ref_defined(
270            &self.config.default_certificate_specs.internal,
271            &balancer.spec.internal_certificate_spec,
272        ) {
273            args.push("--internal-tls".to_owned())
274        }
275
276        let mut volumes = Vec::new();
277        let mut volume_mounts = Vec::new();
278        if issuer_ref_defined(
279            &self.config.default_certificate_specs.balancerd_external,
280            &balancer.spec.external_certificate_spec,
281        ) {
282            volumes.push(Volume {
283                name: "external-certificate".to_owned(),
284                secret: Some(SecretVolumeSource {
285                    default_mode: Some(0o400),
286                    secret_name: Some(balancer.external_certificate_secret_name()),
287                    items: None,
288                    optional: Some(false),
289                }),
290                ..Default::default()
291            });
292            volume_mounts.push(VolumeMount {
293                name: "external-certificate".to_owned(),
294                mount_path: "/etc/external_tls".to_owned(),
295                read_only: Some(true),
296                ..Default::default()
297            });
298            args.extend([
299                "--tls-mode=require".into(),
300                "--tls-cert=/etc/external_tls/tls.crt".into(),
301                "--tls-key=/etc/external_tls/tls.key".into(),
302            ]);
303        } else {
304            args.push("--tls-mode=disable".to_string());
305        }
306
307        let startup_probe = Probe {
308            http_get: Some(HTTPGetAction {
309                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
310                path: Some("/api/readyz".into()),
311                ..Default::default()
312            }),
313            failure_threshold: Some(20),
314            initial_delay_seconds: Some(3),
315            period_seconds: Some(3),
316            success_threshold: Some(1),
317            timeout_seconds: Some(1),
318            ..Default::default()
319        };
320        let readiness_probe = Probe {
321            http_get: Some(HTTPGetAction {
322                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
323                path: Some("/api/readyz".into()),
324                ..Default::default()
325            }),
326            failure_threshold: Some(3),
327            period_seconds: Some(10),
328            success_threshold: Some(1),
329            timeout_seconds: Some(1),
330            ..Default::default()
331        };
332        let liveness_probe = Probe {
333            http_get: Some(HTTPGetAction {
334                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
335                path: Some("/api/livez".into()),
336                ..Default::default()
337            }),
338            failure_threshold: Some(3),
339            initial_delay_seconds: Some(8),
340            period_seconds: Some(10),
341            success_threshold: Some(1),
342            timeout_seconds: Some(1),
343            ..Default::default()
344        };
345
346        let container = Container {
347            name: "balancerd".to_owned(),
348            image: Some(balancer.spec.balancerd_image_ref.clone()),
349            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
350            ports: Some(ports),
351            args: Some(args),
352            startup_probe: Some(startup_probe),
353            readiness_probe: Some(readiness_probe),
354            liveness_probe: Some(liveness_probe),
355            resources: balancer
356                .spec
357                .resource_requirements
358                .clone()
359                .or_else(|| self.config.balancerd_default_resources.clone()),
360            security_context: security_context.clone(),
361            volume_mounts: Some(volume_mounts),
362            ..Default::default()
363        };
364
365        let deployment_spec = DeploymentSpec {
366            replicas: Some(balancer.replicas()),
367            selector: LabelSelector {
368                match_labels: Some(pod_template_labels.clone()),
369                ..Default::default()
370            },
371            strategy: Some(DeploymentStrategy {
372                rolling_update: Some(RollingUpdateDeployment {
373                    // Allow a complete set of new pods at once, to minimize the
374                    // chances of a new connection going to a pod that will be
375                    // immediately drained
376                    max_surge: Some(IntOrString::String("100%".into())),
377                    ..Default::default()
378                }),
379                ..Default::default()
380            }),
381            template: PodTemplateSpec {
382                // not using managed_resource_meta because the pod should be
383                // owned by the deployment, not the materialize instance
384                metadata: Some(ObjectMeta {
385                    annotations: pod_template_annotations,
386                    labels: Some(pod_template_labels),
387                    ..Default::default()
388                }),
389                spec: Some(PodSpec {
390                    containers: vec![container],
391                    node_selector: Some(
392                        self.config
393                            .balancerd_node_selector
394                            .iter()
395                            .map(|selector| (selector.key.clone(), selector.value.clone()))
396                            .collect(),
397                    ),
398                    affinity: self.config.balancerd_affinity.clone(),
399                    tolerations: self.config.balancerd_tolerations.clone(),
400                    security_context: Some(PodSecurityContext {
401                        fs_group: Some(999),
402                        run_as_user: Some(999),
403                        run_as_group: Some(999),
404                        ..Default::default()
405                    }),
406                    scheduler_name: self.config.scheduler_name.clone(),
407                    volumes: Some(volumes),
408                    ..Default::default()
409                }),
410            },
411            ..Default::default()
412        };
413
414        Ok(Deployment {
415            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
416            spec: Some(deployment_spec),
417            status: None,
418        })
419    }
420
421    fn create_service_object(&self, balancer: &Balancer) -> Service {
422        let selector =
423            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
424
425        let ports = vec![
426            ServicePort {
427                name: Some("http".to_string()),
428                protocol: Some("TCP".to_string()),
429                port: self.config.balancerd_http_port.into(),
430                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
431                ..Default::default()
432            },
433            ServicePort {
434                name: Some("pgwire".to_string()),
435                protocol: Some("TCP".to_string()),
436                port: self.config.balancerd_sql_port.into(),
437                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
438                ..Default::default()
439            },
440        ];
441
442        let spec = ServiceSpec {
443            type_: Some("ClusterIP".to_string()),
444            cluster_ip: Some("None".to_string()),
445            selector: Some(selector),
446            ports: Some(ports),
447            ..Default::default()
448        };
449
450        Service {
451            metadata: balancer.managed_resource_meta(balancer.service_name()),
452            spec: Some(spec),
453            status: None,
454        }
455    }
456
457    // TODO: remove this once everyone is upgraded to an orchestratord
458    // version with the separate balancer operator
459    async fn fix_deployment(
460        &self,
461        deployment_api: &Api<Deployment>,
462        new_deployment: &Deployment,
463    ) -> Result<(), Error> {
464        let Some(mut existing_deployment) = self
465            .deployments
466            .get(
467                &ObjectRef::new(&new_deployment.name_unchecked())
468                    .within(&new_deployment.namespace().unwrap()),
469            )
470            .map(Arc::unwrap_or_clone)
471        else {
472            return Ok(());
473        };
474
475        if existing_deployment.spec.as_ref().unwrap().selector
476            == new_deployment.spec.as_ref().unwrap().selector
477        {
478            return Ok(());
479        }
480
481        warn!("found existing deployment with old label selector, fixing");
482
483        // this is sufficient because the new labels are a superset of the
484        // old labels, so the existing label selector should still be valid
485        existing_deployment
486            .spec
487            .as_mut()
488            .unwrap()
489            .template
490            .metadata
491            .as_mut()
492            .unwrap()
493            .labels = new_deployment
494            .spec
495            .as_ref()
496            .unwrap()
497            .template
498            .metadata
499            .as_ref()
500            .unwrap()
501            .labels
502            .clone();
503
504        // using await_condition is not ideal in a controller loop, but this
505        // is very temporary and will only ever happen once, so this feels
506        // simpler than trying to introduce an entire state machine here
507        replace_resource(deployment_api, &existing_deployment).await?;
508        await_condition(
509            deployment_api.clone(),
510            &existing_deployment.name_unchecked(),
511            |deployment: Option<&Deployment>| {
512                let observed_generation = deployment
513                    .and_then(|deployment| deployment.status.as_ref())
514                    .and_then(|status| status.observed_generation)
515                    .unwrap_or(0);
516                let current_generation = deployment
517                    .and_then(|deployment| deployment.meta().generation)
518                    .unwrap_or(0);
519                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
520                observed_generation == current_generation
521                    && current_generation > previous_generation
522            },
523        )
524        .await
525        .map_err(|e| anyhow::anyhow!(e))?;
526        await_condition(
527            deployment_api.clone(),
528            &existing_deployment.name_unchecked(),
529            is_deployment_completed(),
530        )
531        .await
532        .map_err(|e| anyhow::anyhow!(e))?;
533
534        // delete the deployment but leave the pods around (via
535        // DeleteParams::orphan)
536        match kube::runtime::wait::delete::delete_and_finalize(
537            deployment_api.clone(),
538            &existing_deployment.name_unchecked(),
539            &DeleteParams::orphan(),
540        )
541        .await
542        {
543            Ok(_) => {}
544            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
545                if e.code == 404 =>
546            {
547                // the resource already doesn't exist
548            }
549            Err(e) => return Err(anyhow::anyhow!(e).into()),
550        }
551
552        // now, the normal apply of the new deployment (in the main loop)
553        // will take over the existing pods from the old deployment we just
554        // deleted, since we already updated the pod labels to be the same as
555        // the new label selector
556
557        Ok(())
558    }
559}
560
561#[async_trait::async_trait]
562impl k8s_controller::Context for Context {
563    type Resource = Balancer;
564    type Error = Error;
565
566    #[instrument(fields())]
567    async fn apply(
568        &self,
569        client: Client,
570        balancer: &Self::Resource,
571    ) -> Result<Option<Action>, Self::Error> {
572        if balancer.status.is_none() {
573            let balancer_api: Api<Balancer> =
574                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
575            let mut new_balancer = balancer.clone();
576            new_balancer.status = Some(balancer.status());
577            balancer_api
578                .replace_status(
579                    &balancer.name_unchecked(),
580                    &PostParams::default(),
581                    &new_balancer,
582                )
583                .await?;
584            // Updating the status should trigger a reconciliation
585            // which will include a status this time.
586            return Ok(None);
587        }
588
589        let namespace = balancer.namespace();
590        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
591        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
592        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
593
594        if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
595            trace!("creating new balancerd external certificate");
596            apply_resource(&certificate_api, &external_certificate).await?;
597        }
598
599        let deployment = self.create_deployment_object(balancer)?;
600        self.fix_deployment(&deployment_api, &deployment).await?;
601        trace!("creating new balancerd deployment");
602        apply_resource(&deployment_api, &deployment).await?;
603
604        let service = self.create_service_object(balancer);
605        trace!("creating new balancerd service");
606        apply_resource(&service_api, &service).await?;
607
608        self.sync_deployment_status(&client, balancer).await?;
609
610        Ok(None)
611    }
612}