Skip to main content

mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14    api::{
15        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16        core::v1::{
17            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20            Volume, VolumeMount,
21        },
22    },
23    apimachinery::pkg::{
24        apis::meta::v1::{Condition, LabelSelector, Time},
25        util::intstr::IntOrString,
26    },
27    jiff::Timestamp,
28};
29use kube::{
30    Api, Client, Resource, ResourceExt,
31    api::{DeleteParams, ObjectMeta, PostParams},
32    runtime::{
33        conditions::is_deployment_completed,
34        controller::Action,
35        reflector::{ObjectRef, Store},
36        wait::await_condition,
37    },
38};
39use maplit::btreemap;
40use tracing::{trace, warn};
41
42use crate::{
43    Error,
44    k8s::{apply_resource, make_reflector, replace_resource},
45    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
46};
47use mz_cloud_resources::crd::{
48    ManagedResource,
49    balancer::v1alpha1::{Balancer, Routing},
50    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
51};
52use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
53use mz_ore::{cli::KeyValueArg, instrument};
54
55pub struct Config {
56    pub enable_security_context: bool,
57    pub enable_prometheus_scrape_annotations: bool,
58
59    pub image_pull_policy: KubernetesImagePullPolicy,
60    pub scheduler_name: Option<String>,
61    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
62    pub balancerd_affinity: Option<Affinity>,
63    pub balancerd_tolerations: Option<Vec<Toleration>>,
64    pub balancerd_default_resources: Option<ResourceRequirements>,
65
66    pub default_certificate_specs: DefaultCertificateSpecs,
67
68    pub environmentd_sql_port: u16,
69    pub environmentd_http_port: u16,
70    pub balancerd_sql_port: u16,
71    pub balancerd_http_port: u16,
72    pub balancerd_internal_http_port: u16,
73}
74
75pub struct Context {
76    config: Config,
77    deployments: Store<Deployment>,
78}
79
80impl Context {
81    pub async fn new(config: Config, client: Client) -> Self {
82        Self {
83            config,
84            deployments: make_reflector(client).await,
85        }
86    }
87
88    async fn sync_deployment_status(
89        &self,
90        client: &Client,
91        balancer: &Balancer,
92    ) -> Result<(), kube::Error> {
93        let namespace = balancer.namespace();
94        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
95
96        let Some(deployment) = self
97            .deployments
98            .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
99        else {
100            return Ok(());
101        };
102
103        let Some(deployment_conditions) = &deployment
104            .status
105            .as_ref()
106            .and_then(|status| status.conditions.as_ref())
107        else {
108            // if the deployment doesn't have any conditions set yet, there
109            // is nothing to sync
110            return Ok(());
111        };
112
113        let ready = deployment_conditions
114            .iter()
115            .any(|condition| condition.type_ == "Available" && condition.status == "True");
116        let ready_str = if ready { "True" } else { "False" };
117
118        let mut status = balancer.status.clone().unwrap();
119        if status
120            .conditions
121            .iter()
122            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
123        {
124            // if the deployment status is already set correctly, we don't
125            // need to set it again (this prevents us from getting stuck in
126            // a reconcile loop)
127            return Ok(());
128        }
129
130        status.conditions = vec![Condition {
131            type_: "Ready".to_string(),
132            status: ready_str.to_string(),
133            last_transition_time: Time(Timestamp::now()),
134            message: format!(
135                "balancerd deployment is{} ready",
136                if ready { "" } else { " not" }
137            ),
138            observed_generation: None,
139            reason: "DeploymentStatus".to_string(),
140        }];
141        let mut new_balancer = balancer.clone();
142        new_balancer.status = Some(status);
143
144        balancer_api
145            .replace_status(
146                &balancer.name_unchecked(),
147                &PostParams::default(),
148                &new_balancer,
149            )
150            .await?;
151
152        Ok(())
153    }
154
155    fn create_external_certificate_object(
156        &self,
157        balancer: &Balancer,
158    ) -> Result<Option<Certificate>, anyhow::Error> {
159        create_certificate(
160            self.config
161                .default_certificate_specs
162                .balancerd_external
163                .clone(),
164            balancer,
165            balancer.spec.external_certificate_spec.clone(),
166            balancer.external_certificate_name(),
167            balancer.external_certificate_secret_name(),
168            None,
169            CertificatePrivateKeyAlgorithm::Ecdsa,
170            Some(256),
171        )
172    }
173
174    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
175        let security_context = if self.config.enable_security_context {
176            // Since we want to adhere to the most restrictive security context, all
177            // of these fields have to be set how they are.
178            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
179            Some(SecurityContext {
180                run_as_non_root: Some(true),
181                capabilities: Some(Capabilities {
182                    drop: Some(vec!["ALL".to_string()]),
183                    ..Default::default()
184                }),
185                seccomp_profile: Some(SeccompProfile {
186                    type_: "RuntimeDefault".to_string(),
187                    ..Default::default()
188                }),
189                allow_privilege_escalation: Some(false),
190                ..Default::default()
191            })
192        } else {
193            None
194        };
195
196        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
197            Some(btreemap! {
198                "prometheus.io/scrape".to_owned() => "true".to_string(),
199                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
200                "prometheus.io/path".to_owned() => "/metrics".to_string(),
201                "prometheus.io/scheme".to_owned() => "http".to_string(),
202            })
203        } else {
204            None
205        };
206        let mut pod_template_labels = balancer.default_labels();
207        pod_template_labels.insert(
208            "materialize.cloud/name".to_owned(),
209            balancer.deployment_name(),
210        );
211        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
212        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
213
214        let ports = vec![
215            ContainerPort {
216                container_port: self.config.balancerd_sql_port.into(),
217                name: Some("pgwire".into()),
218                protocol: Some("TCP".into()),
219                ..Default::default()
220            },
221            ContainerPort {
222                container_port: self.config.balancerd_http_port.into(),
223                name: Some("http".into()),
224                protocol: Some("TCP".into()),
225                ..Default::default()
226            },
227            ContainerPort {
228                container_port: self.config.balancerd_internal_http_port.into(),
229                name: Some("internal-http".into()),
230                protocol: Some("TCP".into()),
231                ..Default::default()
232            },
233        ];
234
235        let mut args = vec![
236            "service".to_string(),
237            format!(
238                "--pgwire-listen-addr=0.0.0.0:{}",
239                self.config.balancerd_sql_port
240            ),
241            format!(
242                "--https-listen-addr=0.0.0.0:{}",
243                self.config.balancerd_http_port
244            ),
245            format!(
246                "--internal-http-listen-addr=0.0.0.0:{}",
247                self.config.balancerd_internal_http_port
248            ),
249        ];
250        match balancer.routing()? {
251            Routing::Static(static_routing_config) => {
252                args.extend([
253                    format!(
254                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
255                        static_routing_config.environmentd_service_name,
256                        static_routing_config.environmentd_namespace,
257                        self.config.environmentd_http_port
258                    ),
259                    format!(
260                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
261                        static_routing_config.environmentd_service_name,
262                        static_routing_config.environmentd_namespace,
263                        self.config.environmentd_sql_port
264                    ),
265                ]);
266            }
267            Routing::Frontegg(_frontegg_routing_config) => {
268                bail!("frontegg routing is not yet implemented");
269            }
270        }
271
272        if issuer_ref_defined(
273            &self.config.default_certificate_specs.internal,
274            &balancer.spec.internal_certificate_spec,
275        ) {
276            args.push("--internal-tls".to_owned())
277        }
278
279        let mut volumes = Vec::new();
280        let mut volume_mounts = Vec::new();
281        if issuer_ref_defined(
282            &self.config.default_certificate_specs.balancerd_external,
283            &balancer.spec.external_certificate_spec,
284        ) {
285            volumes.push(Volume {
286                name: "external-certificate".to_owned(),
287                secret: Some(SecretVolumeSource {
288                    default_mode: Some(0o400),
289                    secret_name: Some(balancer.external_certificate_secret_name()),
290                    items: None,
291                    optional: Some(false),
292                }),
293                ..Default::default()
294            });
295            volume_mounts.push(VolumeMount {
296                name: "external-certificate".to_owned(),
297                mount_path: "/etc/external_tls".to_owned(),
298                read_only: Some(true),
299                ..Default::default()
300            });
301            args.extend([
302                "--tls-mode=require".into(),
303                "--tls-cert=/etc/external_tls/tls.crt".into(),
304                "--tls-key=/etc/external_tls/tls.key".into(),
305            ]);
306        } else {
307            args.push("--tls-mode=disable".to_string());
308        }
309
310        let startup_probe = Probe {
311            http_get: Some(HTTPGetAction {
312                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
313                path: Some("/api/readyz".into()),
314                ..Default::default()
315            }),
316            failure_threshold: Some(20),
317            initial_delay_seconds: Some(3),
318            period_seconds: Some(3),
319            success_threshold: Some(1),
320            timeout_seconds: Some(1),
321            ..Default::default()
322        };
323        let readiness_probe = Probe {
324            http_get: Some(HTTPGetAction {
325                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
326                path: Some("/api/readyz".into()),
327                ..Default::default()
328            }),
329            failure_threshold: Some(3),
330            period_seconds: Some(10),
331            success_threshold: Some(1),
332            timeout_seconds: Some(1),
333            ..Default::default()
334        };
335        let liveness_probe = Probe {
336            http_get: Some(HTTPGetAction {
337                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
338                path: Some("/api/livez".into()),
339                ..Default::default()
340            }),
341            failure_threshold: Some(3),
342            initial_delay_seconds: Some(8),
343            period_seconds: Some(10),
344            success_threshold: Some(1),
345            timeout_seconds: Some(1),
346            ..Default::default()
347        };
348
349        let container = Container {
350            name: "balancerd".to_owned(),
351            image: Some(balancer.spec.balancerd_image_ref.clone()),
352            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
353            ports: Some(ports),
354            args: Some(args),
355            startup_probe: Some(startup_probe),
356            readiness_probe: Some(readiness_probe),
357            liveness_probe: Some(liveness_probe),
358            resources: balancer
359                .spec
360                .resource_requirements
361                .clone()
362                .or_else(|| self.config.balancerd_default_resources.clone()),
363            security_context: security_context.clone(),
364            volume_mounts: Some(volume_mounts),
365            ..Default::default()
366        };
367
368        let deployment_spec = DeploymentSpec {
369            replicas: Some(balancer.replicas()),
370            selector: LabelSelector {
371                match_labels: Some(pod_template_labels.clone()),
372                ..Default::default()
373            },
374            strategy: Some(DeploymentStrategy {
375                rolling_update: Some(RollingUpdateDeployment {
376                    // Allow a complete set of new pods at once, to minimize the
377                    // chances of a new connection going to a pod that will be
378                    // immediately drained
379                    max_surge: Some(IntOrString::String("100%".into())),
380                    ..Default::default()
381                }),
382                ..Default::default()
383            }),
384            template: PodTemplateSpec {
385                // not using managed_resource_meta because the pod should be
386                // owned by the deployment, not the materialize instance
387                metadata: Some(ObjectMeta {
388                    annotations: pod_template_annotations,
389                    labels: Some(pod_template_labels),
390                    ..Default::default()
391                }),
392                spec: Some(PodSpec {
393                    containers: vec![container],
394                    node_selector: Some(
395                        self.config
396                            .balancerd_node_selector
397                            .iter()
398                            .map(|selector| (selector.key.clone(), selector.value.clone()))
399                            .collect(),
400                    ),
401                    affinity: self.config.balancerd_affinity.clone(),
402                    tolerations: self.config.balancerd_tolerations.clone(),
403                    security_context: Some(PodSecurityContext {
404                        fs_group: Some(999),
405                        run_as_user: Some(999),
406                        run_as_group: Some(999),
407                        ..Default::default()
408                    }),
409                    scheduler_name: self.config.scheduler_name.clone(),
410                    volumes: Some(volumes),
411                    ..Default::default()
412                }),
413            },
414            ..Default::default()
415        };
416
417        Ok(Deployment {
418            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
419            spec: Some(deployment_spec),
420            status: None,
421        })
422    }
423
424    fn create_service_object(&self, balancer: &Balancer) -> Service {
425        let selector =
426            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
427
428        let ports = vec![
429            ServicePort {
430                name: Some("http".to_string()),
431                protocol: Some("TCP".to_string()),
432                port: self.config.balancerd_http_port.into(),
433                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
434                ..Default::default()
435            },
436            ServicePort {
437                name: Some("pgwire".to_string()),
438                protocol: Some("TCP".to_string()),
439                port: self.config.balancerd_sql_port.into(),
440                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
441                ..Default::default()
442            },
443        ];
444
445        let spec = ServiceSpec {
446            type_: Some("ClusterIP".to_string()),
447            cluster_ip: Some("None".to_string()),
448            selector: Some(selector),
449            ports: Some(ports),
450            ..Default::default()
451        };
452
453        Service {
454            metadata: balancer.managed_resource_meta(balancer.service_name()),
455            spec: Some(spec),
456            status: None,
457        }
458    }
459
460    // TODO: remove this once everyone is upgraded to an orchestratord
461    // version with the separate balancer operator
462    async fn fix_deployment(
463        &self,
464        deployment_api: &Api<Deployment>,
465        new_deployment: &Deployment,
466    ) -> Result<(), Error> {
467        let Some(mut existing_deployment) = self
468            .deployments
469            .get(
470                &ObjectRef::new(&new_deployment.name_unchecked())
471                    .within(&new_deployment.namespace().unwrap()),
472            )
473            .map(Arc::unwrap_or_clone)
474        else {
475            return Ok(());
476        };
477
478        if existing_deployment.spec.as_ref().unwrap().selector
479            == new_deployment.spec.as_ref().unwrap().selector
480        {
481            return Ok(());
482        }
483
484        warn!("found existing deployment with old label selector, fixing");
485
486        // this is sufficient because the new labels are a superset of the
487        // old labels, so the existing label selector should still be valid
488        existing_deployment
489            .spec
490            .as_mut()
491            .unwrap()
492            .template
493            .metadata
494            .as_mut()
495            .unwrap()
496            .labels = new_deployment
497            .spec
498            .as_ref()
499            .unwrap()
500            .template
501            .metadata
502            .as_ref()
503            .unwrap()
504            .labels
505            .clone();
506
507        // using await_condition is not ideal in a controller loop, but this
508        // is very temporary and will only ever happen once, so this feels
509        // simpler than trying to introduce an entire state machine here
510        replace_resource(deployment_api, &existing_deployment).await?;
511        await_condition(
512            deployment_api.clone(),
513            &existing_deployment.name_unchecked(),
514            |deployment: Option<&Deployment>| {
515                let observed_generation = deployment
516                    .and_then(|deployment| deployment.status.as_ref())
517                    .and_then(|status| status.observed_generation)
518                    .unwrap_or(0);
519                let current_generation = deployment
520                    .and_then(|deployment| deployment.meta().generation)
521                    .unwrap_or(0);
522                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
523                observed_generation == current_generation
524                    && current_generation > previous_generation
525            },
526        )
527        .await
528        .map_err(|e| anyhow::anyhow!(e))?;
529        await_condition(
530            deployment_api.clone(),
531            &existing_deployment.name_unchecked(),
532            is_deployment_completed(),
533        )
534        .await
535        .map_err(|e| anyhow::anyhow!(e))?;
536
537        // delete the deployment but leave the pods around (via
538        // DeleteParams::orphan)
539        match kube::runtime::wait::delete::delete_and_finalize(
540            deployment_api.clone(),
541            &existing_deployment.name_unchecked(),
542            &DeleteParams::orphan(),
543        )
544        .await
545        {
546            Ok(_) => {}
547            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
548                if e.code == 404 =>
549            {
550                // the resource already doesn't exist
551            }
552            Err(e) => return Err(anyhow::anyhow!(e).into()),
553        }
554
555        // now, the normal apply of the new deployment (in the main loop)
556        // will take over the existing pods from the old deployment we just
557        // deleted, since we already updated the pod labels to be the same as
558        // the new label selector
559
560        Ok(())
561    }
562}
563
564#[async_trait::async_trait]
565impl k8s_controller::Context for Context {
566    type Resource = Balancer;
567    type Error = Error;
568
569    #[instrument(fields())]
570    async fn apply(
571        &self,
572        client: Client,
573        balancer: &Self::Resource,
574    ) -> Result<Option<Action>, Self::Error> {
575        if balancer.status.is_none() {
576            let balancer_api: Api<Balancer> =
577                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
578            let mut new_balancer = balancer.clone();
579            new_balancer.status = Some(balancer.status());
580            balancer_api
581                .replace_status(
582                    &balancer.name_unchecked(),
583                    &PostParams::default(),
584                    &new_balancer,
585                )
586                .await?;
587            // Updating the status should trigger a reconciliation
588            // which will include a status this time.
589            return Ok(None);
590        }
591
592        let namespace = balancer.namespace();
593        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
594        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
595        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
596
597        if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
598            trace!("creating new balancerd external certificate");
599            apply_resource(&certificate_api, &external_certificate).await?;
600        }
601
602        let deployment = self.create_deployment_object(balancer)?;
603        self.fix_deployment(&deployment_api, &deployment).await?;
604        trace!("creating new balancerd deployment");
605        apply_resource(&deployment_api, &deployment).await?;
606
607        let service = self.create_service_object(balancer);
608        trace!("creating new balancerd service");
609        apply_resource(&service_api, &service).await?;
610
611        self.sync_deployment_status(&client, balancer).await?;
612
613        Ok(None)
614    }
615}