mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14    api::{
15        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16        core::v1::{
17            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20            Volume, VolumeMount,
21        },
22    },
23    apimachinery::pkg::{
24        apis::meta::v1::{Condition, LabelSelector, Time},
25        util::intstr::IntOrString,
26    },
27};
28use kube::{
29    Api, Client, Resource, ResourceExt,
30    api::{DeleteParams, ObjectMeta, PostParams},
31    runtime::{
32        conditions::is_deployment_completed,
33        controller::Action,
34        reflector::{ObjectRef, Store},
35        wait::await_condition,
36    },
37};
38use maplit::btreemap;
39use tracing::{trace, warn};
40
41use crate::{
42    Error,
43    k8s::{apply_resource, make_reflector, replace_resource},
44    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
45};
46use mz_cloud_resources::crd::{
47    ManagedResource,
48    balancer::v1alpha1::{Balancer, Routing},
49    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
50};
51use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
52use mz_ore::{cli::KeyValueArg, instrument};
53
54pub struct Config {
55    pub enable_security_context: bool,
56    pub enable_prometheus_scrape_annotations: bool,
57
58    pub image_pull_policy: KubernetesImagePullPolicy,
59    pub scheduler_name: Option<String>,
60    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
61    pub balancerd_affinity: Option<Affinity>,
62    pub balancerd_tolerations: Option<Vec<Toleration>>,
63    pub balancerd_default_resources: Option<ResourceRequirements>,
64
65    pub default_certificate_specs: DefaultCertificateSpecs,
66
67    pub environmentd_sql_port: u16,
68    pub environmentd_http_port: u16,
69    pub balancerd_sql_port: u16,
70    pub balancerd_http_port: u16,
71    pub balancerd_internal_http_port: u16,
72}
73
74pub struct Context {
75    config: Config,
76    deployments: Store<Deployment>,
77}
78
79impl Context {
80    pub async fn new(config: Config, client: Client) -> Self {
81        Self {
82            config,
83            deployments: make_reflector(client).await,
84        }
85    }
86
87    async fn sync_deployment_status(
88        &self,
89        client: &Client,
90        balancer: &Balancer,
91    ) -> Result<(), kube::Error> {
92        let namespace = balancer.namespace();
93        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
94
95        let Some(deployment) = self
96            .deployments
97            .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
98        else {
99            return Ok(());
100        };
101
102        let Some(deployment_conditions) = &deployment
103            .status
104            .as_ref()
105            .and_then(|status| status.conditions.as_ref())
106        else {
107            // if the deployment doesn't have any conditions set yet, there
108            // is nothing to sync
109            return Ok(());
110        };
111
112        let ready = deployment_conditions
113            .iter()
114            .any(|condition| condition.type_ == "Available" && condition.status == "True");
115        let ready_str = if ready { "True" } else { "False" };
116
117        let mut status = balancer.status.clone().unwrap();
118        if status
119            .conditions
120            .iter()
121            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
122        {
123            // if the deployment status is already set correctly, we don't
124            // need to set it again (this prevents us from getting stuck in
125            // a reconcile loop)
126            return Ok(());
127        }
128
129        status.conditions = vec![Condition {
130            type_: "Ready".to_string(),
131            status: ready_str.to_string(),
132            last_transition_time: Time(chrono::offset::Utc::now()),
133            message: format!(
134                "balancerd deployment is{} ready",
135                if ready { "" } else { " not" }
136            ),
137            observed_generation: None,
138            reason: "DeploymentStatus".to_string(),
139        }];
140        let mut new_balancer = balancer.clone();
141        new_balancer.status = Some(status);
142
143        balancer_api
144            .replace_status(
145                &balancer.name_unchecked(),
146                &PostParams::default(),
147                serde_json::to_vec(&new_balancer).unwrap(),
148            )
149            .await?;
150
151        Ok(())
152    }
153
154    fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
155        create_certificate(
156            self.config
157                .default_certificate_specs
158                .balancerd_external
159                .clone(),
160            balancer,
161            balancer.spec.external_certificate_spec.clone(),
162            balancer.external_certificate_name(),
163            balancer.external_certificate_secret_name(),
164            None,
165            CertificatePrivateKeyAlgorithm::Rsa,
166            Some(4096),
167        )
168    }
169
170    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
171        let security_context = if self.config.enable_security_context {
172            // Since we want to adhere to the most restrictive security context, all
173            // of these fields have to be set how they are.
174            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
175            Some(SecurityContext {
176                run_as_non_root: Some(true),
177                capabilities: Some(Capabilities {
178                    drop: Some(vec!["ALL".to_string()]),
179                    ..Default::default()
180                }),
181                seccomp_profile: Some(SeccompProfile {
182                    type_: "RuntimeDefault".to_string(),
183                    ..Default::default()
184                }),
185                allow_privilege_escalation: Some(false),
186                ..Default::default()
187            })
188        } else {
189            None
190        };
191
192        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
193            Some(btreemap! {
194                "prometheus.io/scrape".to_owned() => "true".to_string(),
195                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
196                "prometheus.io/path".to_owned() => "/metrics".to_string(),
197                "prometheus.io/scheme".to_owned() => "http".to_string(),
198            })
199        } else {
200            None
201        };
202        let mut pod_template_labels = balancer.default_labels();
203        pod_template_labels.insert(
204            "materialize.cloud/name".to_owned(),
205            balancer.deployment_name(),
206        );
207        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
208        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
209
210        let ports = vec![
211            ContainerPort {
212                container_port: self.config.balancerd_sql_port.into(),
213                name: Some("pgwire".into()),
214                protocol: Some("TCP".into()),
215                ..Default::default()
216            },
217            ContainerPort {
218                container_port: self.config.balancerd_http_port.into(),
219                name: Some("http".into()),
220                protocol: Some("TCP".into()),
221                ..Default::default()
222            },
223            ContainerPort {
224                container_port: self.config.balancerd_internal_http_port.into(),
225                name: Some("internal-http".into()),
226                protocol: Some("TCP".into()),
227                ..Default::default()
228            },
229        ];
230
231        let mut args = vec![
232            "service".to_string(),
233            format!(
234                "--pgwire-listen-addr=0.0.0.0:{}",
235                self.config.balancerd_sql_port
236            ),
237            format!(
238                "--https-listen-addr=0.0.0.0:{}",
239                self.config.balancerd_http_port
240            ),
241            format!(
242                "--internal-http-listen-addr=0.0.0.0:{}",
243                self.config.balancerd_internal_http_port
244            ),
245        ];
246        match balancer.routing()? {
247            Routing::Static(static_routing_config) => {
248                args.extend([
249                    format!(
250                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
251                        static_routing_config.environmentd_service_name,
252                        static_routing_config.environmentd_namespace,
253                        self.config.environmentd_http_port
254                    ),
255                    format!(
256                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
257                        static_routing_config.environmentd_service_name,
258                        static_routing_config.environmentd_namespace,
259                        self.config.environmentd_sql_port
260                    ),
261                ]);
262            }
263            Routing::Frontegg(_frontegg_routing_config) => {
264                bail!("frontegg routing is not yet implemented");
265            }
266        }
267
268        if issuer_ref_defined(
269            &self.config.default_certificate_specs.internal,
270            &balancer.spec.internal_certificate_spec,
271        ) {
272            args.push("--internal-tls".to_owned())
273        }
274
275        let mut volumes = Vec::new();
276        let mut volume_mounts = Vec::new();
277        if issuer_ref_defined(
278            &self.config.default_certificate_specs.balancerd_external,
279            &balancer.spec.external_certificate_spec,
280        ) {
281            volumes.push(Volume {
282                name: "external-certificate".to_owned(),
283                secret: Some(SecretVolumeSource {
284                    default_mode: Some(0o400),
285                    secret_name: Some(balancer.external_certificate_secret_name()),
286                    items: None,
287                    optional: Some(false),
288                }),
289                ..Default::default()
290            });
291            volume_mounts.push(VolumeMount {
292                name: "external-certificate".to_owned(),
293                mount_path: "/etc/external_tls".to_owned(),
294                read_only: Some(true),
295                ..Default::default()
296            });
297            args.extend([
298                "--tls-mode=require".into(),
299                "--tls-cert=/etc/external_tls/tls.crt".into(),
300                "--tls-key=/etc/external_tls/tls.key".into(),
301            ]);
302        } else {
303            args.push("--tls-mode=disable".to_string());
304        }
305
306        let startup_probe = Probe {
307            http_get: Some(HTTPGetAction {
308                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
309                path: Some("/api/readyz".into()),
310                ..Default::default()
311            }),
312            failure_threshold: Some(20),
313            initial_delay_seconds: Some(3),
314            period_seconds: Some(3),
315            success_threshold: Some(1),
316            timeout_seconds: Some(1),
317            ..Default::default()
318        };
319        let readiness_probe = Probe {
320            http_get: Some(HTTPGetAction {
321                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
322                path: Some("/api/readyz".into()),
323                ..Default::default()
324            }),
325            failure_threshold: Some(3),
326            period_seconds: Some(10),
327            success_threshold: Some(1),
328            timeout_seconds: Some(1),
329            ..Default::default()
330        };
331        let liveness_probe = Probe {
332            http_get: Some(HTTPGetAction {
333                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
334                path: Some("/api/livez".into()),
335                ..Default::default()
336            }),
337            failure_threshold: Some(3),
338            initial_delay_seconds: Some(8),
339            period_seconds: Some(10),
340            success_threshold: Some(1),
341            timeout_seconds: Some(1),
342            ..Default::default()
343        };
344
345        let container = Container {
346            name: "balancerd".to_owned(),
347            image: Some(balancer.spec.balancerd_image_ref.clone()),
348            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
349            ports: Some(ports),
350            args: Some(args),
351            startup_probe: Some(startup_probe),
352            readiness_probe: Some(readiness_probe),
353            liveness_probe: Some(liveness_probe),
354            resources: balancer
355                .spec
356                .resource_requirements
357                .clone()
358                .or_else(|| self.config.balancerd_default_resources.clone()),
359            security_context: security_context.clone(),
360            volume_mounts: Some(volume_mounts),
361            ..Default::default()
362        };
363
364        let deployment_spec = DeploymentSpec {
365            replicas: Some(balancer.replicas()),
366            selector: LabelSelector {
367                match_labels: Some(pod_template_labels.clone()),
368                ..Default::default()
369            },
370            strategy: Some(DeploymentStrategy {
371                rolling_update: Some(RollingUpdateDeployment {
372                    // Allow a complete set of new pods at once, to minimize the
373                    // chances of a new connection going to a pod that will be
374                    // immediately drained
375                    max_surge: Some(IntOrString::String("100%".into())),
376                    ..Default::default()
377                }),
378                ..Default::default()
379            }),
380            template: PodTemplateSpec {
381                // not using managed_resource_meta because the pod should be
382                // owned by the deployment, not the materialize instance
383                metadata: Some(ObjectMeta {
384                    annotations: pod_template_annotations,
385                    labels: Some(pod_template_labels),
386                    ..Default::default()
387                }),
388                spec: Some(PodSpec {
389                    containers: vec![container],
390                    node_selector: Some(
391                        self.config
392                            .balancerd_node_selector
393                            .iter()
394                            .map(|selector| (selector.key.clone(), selector.value.clone()))
395                            .collect(),
396                    ),
397                    affinity: self.config.balancerd_affinity.clone(),
398                    tolerations: self.config.balancerd_tolerations.clone(),
399                    security_context: Some(PodSecurityContext {
400                        fs_group: Some(999),
401                        run_as_user: Some(999),
402                        run_as_group: Some(999),
403                        ..Default::default()
404                    }),
405                    scheduler_name: self.config.scheduler_name.clone(),
406                    volumes: Some(volumes),
407                    ..Default::default()
408                }),
409            },
410            ..Default::default()
411        };
412
413        Ok(Deployment {
414            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
415            spec: Some(deployment_spec),
416            status: None,
417        })
418    }
419
420    fn create_service_object(&self, balancer: &Balancer) -> Service {
421        let selector =
422            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
423
424        let ports = vec![
425            ServicePort {
426                name: Some("http".to_string()),
427                protocol: Some("TCP".to_string()),
428                port: self.config.balancerd_http_port.into(),
429                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
430                ..Default::default()
431            },
432            ServicePort {
433                name: Some("pgwire".to_string()),
434                protocol: Some("TCP".to_string()),
435                port: self.config.balancerd_sql_port.into(),
436                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
437                ..Default::default()
438            },
439        ];
440
441        let spec = ServiceSpec {
442            type_: Some("ClusterIP".to_string()),
443            cluster_ip: Some("None".to_string()),
444            selector: Some(selector),
445            ports: Some(ports),
446            ..Default::default()
447        };
448
449        Service {
450            metadata: balancer.managed_resource_meta(balancer.service_name()),
451            spec: Some(spec),
452            status: None,
453        }
454    }
455
456    // TODO: remove this once everyone is upgraded to an orchestratord
457    // version with the separate balancer operator
458    async fn fix_deployment(
459        &self,
460        deployment_api: &Api<Deployment>,
461        new_deployment: &Deployment,
462    ) -> Result<(), Error> {
463        let Some(mut existing_deployment) = self
464            .deployments
465            .get(
466                &ObjectRef::new(&new_deployment.name_unchecked())
467                    .within(&new_deployment.namespace().unwrap()),
468            )
469            .map(Arc::unwrap_or_clone)
470        else {
471            return Ok(());
472        };
473
474        if existing_deployment.spec.as_ref().unwrap().selector
475            == new_deployment.spec.as_ref().unwrap().selector
476        {
477            return Ok(());
478        }
479
480        warn!("found existing deployment with old label selector, fixing");
481
482        // this is sufficient because the new labels are a superset of the
483        // old labels, so the existing label selector should still be valid
484        existing_deployment
485            .spec
486            .as_mut()
487            .unwrap()
488            .template
489            .metadata
490            .as_mut()
491            .unwrap()
492            .labels = new_deployment
493            .spec
494            .as_ref()
495            .unwrap()
496            .template
497            .metadata
498            .as_ref()
499            .unwrap()
500            .labels
501            .clone();
502
503        // using await_condition is not ideal in a controller loop, but this
504        // is very temporary and will only ever happen once, so this feels
505        // simpler than trying to introduce an entire state machine here
506        replace_resource(deployment_api, &existing_deployment).await?;
507        await_condition(
508            deployment_api.clone(),
509            &existing_deployment.name_unchecked(),
510            |deployment: Option<&Deployment>| {
511                let observed_generation = deployment
512                    .and_then(|deployment| deployment.status.as_ref())
513                    .and_then(|status| status.observed_generation)
514                    .unwrap_or(0);
515                let current_generation = deployment
516                    .and_then(|deployment| deployment.meta().generation)
517                    .unwrap_or(0);
518                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
519                observed_generation == current_generation
520                    && current_generation > previous_generation
521            },
522        )
523        .await
524        .map_err(|e| anyhow::anyhow!(e))?;
525        await_condition(
526            deployment_api.clone(),
527            &existing_deployment.name_unchecked(),
528            is_deployment_completed(),
529        )
530        .await
531        .map_err(|e| anyhow::anyhow!(e))?;
532
533        // delete the deployment but leave the pods around (via
534        // DeleteParams::orphan)
535        match kube::runtime::wait::delete::delete_and_finalize(
536            deployment_api.clone(),
537            &existing_deployment.name_unchecked(),
538            &DeleteParams::orphan(),
539        )
540        .await
541        {
542            Ok(_) => {}
543            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
544                if e.code == 404 =>
545            {
546                // the resource already doesn't exist
547            }
548            Err(e) => return Err(anyhow::anyhow!(e).into()),
549        }
550
551        // now, the normal apply of the new deployment (in the main loop)
552        // will take over the existing pods from the old deployment we just
553        // deleted, since we already updated the pod labels to be the same as
554        // the new label selector
555
556        Ok(())
557    }
558}
559
560#[async_trait::async_trait]
561impl k8s_controller::Context for Context {
562    type Resource = Balancer;
563    type Error = Error;
564
565    #[instrument(fields())]
566    async fn apply(
567        &self,
568        client: Client,
569        balancer: &Self::Resource,
570    ) -> Result<Option<Action>, Self::Error> {
571        if balancer.status.is_none() {
572            let balancer_api: Api<Balancer> =
573                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
574            let mut new_balancer = balancer.clone();
575            new_balancer.status = Some(balancer.status());
576            balancer_api
577                .replace_status(
578                    &balancer.name_unchecked(),
579                    &PostParams::default(),
580                    serde_json::to_vec(&new_balancer).unwrap(),
581                )
582                .await?;
583            // Updating the status should trigger a reconciliation
584            // which will include a status this time.
585            return Ok(None);
586        }
587
588        let namespace = balancer.namespace();
589        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
590        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
591        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
592
593        if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
594            trace!("creating new balancerd external certificate");
595            apply_resource(&certificate_api, &external_certificate).await?;
596        }
597
598        let deployment = self.create_deployment_object(balancer)?;
599        self.fix_deployment(&deployment_api, &deployment).await?;
600        trace!("creating new balancerd deployment");
601        apply_resource(&deployment_api, &deployment).await?;
602
603        let service = self.create_service_object(balancer);
604        trace!("creating new balancerd service");
605        apply_resource(&service_api, &service).await?;
606
607        self.sync_deployment_status(&client, balancer).await?;
608
609        Ok(None)
610    }
611}