mz_orchestratord/controller/
balancer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::bail;
11use k8s_openapi::{
12    api::{
13        apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
14        core::v1::{
15            Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
16            PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
17            SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
18            Volume, VolumeMount,
19        },
20    },
21    apimachinery::pkg::{
22        apis::meta::v1::{Condition, LabelSelector, Time},
23        util::intstr::IntOrString,
24    },
25};
26use kube::{
27    Api, Client, Resource, ResourceExt,
28    api::{ObjectMeta, PostParams},
29    runtime::{
30        controller::Action,
31        reflector::{ObjectRef, Store},
32    },
33};
34use maplit::btreemap;
35use tracing::trace;
36
37use crate::{
38    Error,
39    k8s::{apply_resource, make_reflector},
40    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
41};
42use mz_cloud_resources::crd::{
43    ManagedResource,
44    balancer::v1alpha1::{Balancer, Routing},
45    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
46};
47use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
48use mz_ore::{cli::KeyValueArg, instrument};
49
50pub struct Config {
51    pub enable_security_context: bool,
52    pub enable_prometheus_scrape_annotations: bool,
53
54    pub image_pull_policy: KubernetesImagePullPolicy,
55    pub scheduler_name: Option<String>,
56    pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
57    pub balancerd_affinity: Option<Affinity>,
58    pub balancerd_tolerations: Option<Vec<Toleration>>,
59    pub balancerd_default_resources: Option<ResourceRequirements>,
60
61    pub default_certificate_specs: DefaultCertificateSpecs,
62
63    pub environmentd_sql_port: u16,
64    pub environmentd_http_port: u16,
65    pub balancerd_sql_port: u16,
66    pub balancerd_http_port: u16,
67    pub balancerd_internal_http_port: u16,
68}
69
70pub struct Context {
71    config: Config,
72    deployments: Store<Deployment>,
73}
74
75impl Context {
76    pub async fn new(config: Config, client: Client) -> Self {
77        Self {
78            config,
79            deployments: make_reflector(client).await,
80        }
81    }
82
83    async fn sync_deployment_status(
84        &self,
85        client: &Client,
86        balancer: &Balancer,
87    ) -> Result<(), kube::Error> {
88        let namespace = balancer.namespace().unwrap();
89        let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
90
91        let Some(deployment) = self
92            .deployments
93            .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
94        else {
95            return Ok(());
96        };
97
98        let Some(deployment_conditions) = &deployment
99            .status
100            .as_ref()
101            .and_then(|status| status.conditions.as_ref())
102        else {
103            // if the deployment doesn't have any conditions set yet, there
104            // is nothing to sync
105            return Ok(());
106        };
107
108        let ready = deployment_conditions
109            .iter()
110            .any(|condition| condition.type_ == "Available" && condition.status == "True");
111        let ready_str = if ready { "True" } else { "False" };
112
113        let mut status = balancer.status.clone().unwrap();
114        if status
115            .conditions
116            .iter()
117            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
118        {
119            // if the deployment status is already set correctly, we don't
120            // need to set it again (this prevents us from getting stuck in
121            // a reconcile loop)
122            return Ok(());
123        }
124
125        status.conditions = vec![Condition {
126            type_: "Ready".to_string(),
127            status: ready_str.to_string(),
128            last_transition_time: Time(chrono::offset::Utc::now()),
129            message: format!(
130                "balancerd deployment is{} ready",
131                if ready { "" } else { " not" }
132            ),
133            observed_generation: None,
134            reason: "DeploymentStatus".to_string(),
135        }];
136        let mut new_balancer = balancer.clone();
137        new_balancer.status = Some(status);
138
139        balancer_api
140            .replace_status(
141                &balancer.name_unchecked(),
142                &PostParams::default(),
143                serde_json::to_vec(&new_balancer).unwrap(),
144            )
145            .await?;
146
147        Ok(())
148    }
149
150    fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
151        create_certificate(
152            self.config
153                .default_certificate_specs
154                .balancerd_external
155                .clone(),
156            balancer,
157            balancer.spec.external_certificate_spec.clone(),
158            balancer.external_certificate_name(),
159            balancer.external_certificate_secret_name(),
160            None,
161            CertificatePrivateKeyAlgorithm::Rsa,
162            Some(4096),
163        )
164    }
165
166    fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
167        let security_context = if self.config.enable_security_context {
168            // Since we want to adhere to the most restrictive security context, all
169            // of these fields have to be set how they are.
170            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
171            Some(SecurityContext {
172                run_as_non_root: Some(true),
173                capabilities: Some(Capabilities {
174                    drop: Some(vec!["ALL".to_string()]),
175                    ..Default::default()
176                }),
177                seccomp_profile: Some(SeccompProfile {
178                    type_: "RuntimeDefault".to_string(),
179                    ..Default::default()
180                }),
181                allow_privilege_escalation: Some(false),
182                ..Default::default()
183            })
184        } else {
185            None
186        };
187
188        let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
189            Some(btreemap! {
190                "prometheus.io/scrape".to_owned() => "true".to_string(),
191                "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
192                "prometheus.io/path".to_owned() => "/metrics".to_string(),
193                "prometheus.io/scheme".to_owned() => "http".to_string(),
194            })
195        } else {
196            None
197        };
198        let mut pod_template_labels = balancer.default_labels();
199        pod_template_labels.insert(
200            "materialize.cloud/name".to_owned(),
201            balancer.deployment_name(),
202        );
203        pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
204        pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
205
206        let ports = vec![
207            ContainerPort {
208                container_port: self.config.balancerd_sql_port.into(),
209                name: Some("pgwire".into()),
210                protocol: Some("TCP".into()),
211                ..Default::default()
212            },
213            ContainerPort {
214                container_port: self.config.balancerd_http_port.into(),
215                name: Some("http".into()),
216                protocol: Some("TCP".into()),
217                ..Default::default()
218            },
219            ContainerPort {
220                container_port: self.config.balancerd_internal_http_port.into(),
221                name: Some("internal-http".into()),
222                protocol: Some("TCP".into()),
223                ..Default::default()
224            },
225        ];
226
227        let mut args = vec![
228            "service".to_string(),
229            format!(
230                "--pgwire-listen-addr=0.0.0.0:{}",
231                self.config.balancerd_sql_port
232            ),
233            format!(
234                "--https-listen-addr=0.0.0.0:{}",
235                self.config.balancerd_http_port
236            ),
237            format!(
238                "--internal-http-listen-addr=0.0.0.0:{}",
239                self.config.balancerd_internal_http_port
240            ),
241        ];
242        match balancer.routing()? {
243            Routing::Static(static_routing_config) => {
244                args.extend([
245                    format!(
246                        "--https-resolver-template={}.{}.svc.cluster.local:{}",
247                        static_routing_config.environmentd_service_name,
248                        static_routing_config.environmentd_namespace,
249                        self.config.environmentd_http_port
250                    ),
251                    format!(
252                        "--static-resolver-addr={}.{}.svc.cluster.local:{}",
253                        static_routing_config.environmentd_service_name,
254                        static_routing_config.environmentd_namespace,
255                        self.config.environmentd_sql_port
256                    ),
257                ]);
258            }
259            Routing::Frontegg(_frontegg_routing_config) => {
260                bail!("frontegg routing is not yet implemented");
261            }
262        }
263
264        if issuer_ref_defined(
265            &self.config.default_certificate_specs.internal,
266            &balancer.spec.internal_certificate_spec,
267        ) {
268            args.push("--internal-tls".to_owned())
269        }
270
271        let mut volumes = Vec::new();
272        let mut volume_mounts = Vec::new();
273        if issuer_ref_defined(
274            &self.config.default_certificate_specs.balancerd_external,
275            &balancer.spec.external_certificate_spec,
276        ) {
277            volumes.push(Volume {
278                name: "external-certificate".to_owned(),
279                secret: Some(SecretVolumeSource {
280                    default_mode: Some(0o400),
281                    secret_name: Some(balancer.external_certificate_secret_name()),
282                    items: None,
283                    optional: Some(false),
284                }),
285                ..Default::default()
286            });
287            volume_mounts.push(VolumeMount {
288                name: "external-certificate".to_owned(),
289                mount_path: "/etc/external_tls".to_owned(),
290                read_only: Some(true),
291                ..Default::default()
292            });
293            args.extend([
294                "--tls-mode=require".into(),
295                "--tls-cert=/etc/external_tls/tls.crt".into(),
296                "--tls-key=/etc/external_tls/tls.key".into(),
297            ]);
298        } else {
299            args.push("--tls-mode=disable".to_string());
300        }
301
302        let startup_probe = Probe {
303            http_get: Some(HTTPGetAction {
304                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
305                path: Some("/api/readyz".into()),
306                ..Default::default()
307            }),
308            failure_threshold: Some(20),
309            initial_delay_seconds: Some(3),
310            period_seconds: Some(3),
311            success_threshold: Some(1),
312            timeout_seconds: Some(1),
313            ..Default::default()
314        };
315        let readiness_probe = Probe {
316            http_get: Some(HTTPGetAction {
317                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
318                path: Some("/api/readyz".into()),
319                ..Default::default()
320            }),
321            failure_threshold: Some(3),
322            period_seconds: Some(10),
323            success_threshold: Some(1),
324            timeout_seconds: Some(1),
325            ..Default::default()
326        };
327        let liveness_probe = Probe {
328            http_get: Some(HTTPGetAction {
329                port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
330                path: Some("/api/livez".into()),
331                ..Default::default()
332            }),
333            failure_threshold: Some(3),
334            initial_delay_seconds: Some(8),
335            period_seconds: Some(10),
336            success_threshold: Some(1),
337            timeout_seconds: Some(1),
338            ..Default::default()
339        };
340
341        let container = Container {
342            name: "balancerd".to_owned(),
343            image: Some(balancer.spec.balancerd_image_ref.clone()),
344            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
345            ports: Some(ports),
346            args: Some(args),
347            startup_probe: Some(startup_probe),
348            readiness_probe: Some(readiness_probe),
349            liveness_probe: Some(liveness_probe),
350            resources: balancer
351                .spec
352                .resource_requirements
353                .clone()
354                .or_else(|| self.config.balancerd_default_resources.clone()),
355            security_context: security_context.clone(),
356            volume_mounts: Some(volume_mounts),
357            ..Default::default()
358        };
359
360        let deployment_spec = DeploymentSpec {
361            replicas: Some(balancer.replicas()),
362            selector: LabelSelector {
363                match_labels: Some(pod_template_labels.clone()),
364                ..Default::default()
365            },
366            strategy: Some(DeploymentStrategy {
367                rolling_update: Some(RollingUpdateDeployment {
368                    // Allow a complete set of new pods at once, to minimize the
369                    // chances of a new connection going to a pod that will be
370                    // immediately drained
371                    max_surge: Some(IntOrString::String("100%".into())),
372                    ..Default::default()
373                }),
374                ..Default::default()
375            }),
376            template: PodTemplateSpec {
377                // not using managed_resource_meta because the pod should be
378                // owned by the deployment, not the materialize instance
379                metadata: Some(ObjectMeta {
380                    annotations: pod_template_annotations,
381                    labels: Some(pod_template_labels),
382                    ..Default::default()
383                }),
384                spec: Some(PodSpec {
385                    containers: vec![container],
386                    node_selector: Some(
387                        self.config
388                            .balancerd_node_selector
389                            .iter()
390                            .map(|selector| (selector.key.clone(), selector.value.clone()))
391                            .collect(),
392                    ),
393                    affinity: self.config.balancerd_affinity.clone(),
394                    tolerations: self.config.balancerd_tolerations.clone(),
395                    security_context: Some(PodSecurityContext {
396                        fs_group: Some(999),
397                        run_as_user: Some(999),
398                        run_as_group: Some(999),
399                        ..Default::default()
400                    }),
401                    scheduler_name: self.config.scheduler_name.clone(),
402                    volumes: Some(volumes),
403                    ..Default::default()
404                }),
405            },
406            ..Default::default()
407        };
408
409        Ok(Deployment {
410            metadata: balancer.managed_resource_meta(balancer.deployment_name()),
411            spec: Some(deployment_spec),
412            status: None,
413        })
414    }
415
416    fn create_service_object(&self, balancer: &Balancer) -> Service {
417        let selector =
418            btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
419
420        let ports = vec![
421            ServicePort {
422                name: Some("http".to_string()),
423                protocol: Some("TCP".to_string()),
424                port: self.config.balancerd_http_port.into(),
425                target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
426                ..Default::default()
427            },
428            ServicePort {
429                name: Some("pgwire".to_string()),
430                protocol: Some("TCP".to_string()),
431                port: self.config.balancerd_sql_port.into(),
432                target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
433                ..Default::default()
434            },
435        ];
436
437        let spec = ServiceSpec {
438            type_: Some("ClusterIP".to_string()),
439            cluster_ip: Some("None".to_string()),
440            selector: Some(selector),
441            ports: Some(ports),
442            ..Default::default()
443        };
444
445        Service {
446            metadata: balancer.managed_resource_meta(balancer.service_name()),
447            spec: Some(spec),
448            status: None,
449        }
450    }
451}
452
453#[async_trait::async_trait]
454impl k8s_controller::Context for Context {
455    type Resource = Balancer;
456    type Error = Error;
457
458    #[instrument(fields())]
459    async fn apply(
460        &self,
461        client: Client,
462        balancer: &Self::Resource,
463    ) -> Result<Option<Action>, Self::Error> {
464        if balancer.status.is_none() {
465            let balancer_api: Api<Balancer> =
466                Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
467            let mut new_balancer = balancer.clone();
468            new_balancer.status = Some(balancer.status());
469            balancer_api
470                .replace_status(
471                    &balancer.name_unchecked(),
472                    &PostParams::default(),
473                    serde_json::to_vec(&new_balancer).unwrap(),
474                )
475                .await?;
476            // Updating the status should trigger a reconciliation
477            // which will include a status this time.
478            return Ok(None);
479        }
480
481        let namespace = balancer.namespace().unwrap();
482        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
483        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
484        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
485
486        if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
487            trace!("creating new balancerd external certificate");
488            apply_resource(&certificate_api, &external_certificate).await?;
489        }
490
491        let deployment = self.create_deployment_object(balancer)?;
492        trace!("creating new balancerd deployment");
493        apply_resource(&deployment_api, &deployment).await?;
494
495        let service = self.create_service_object(balancer);
496        trace!("creating new balancerd service");
497        apply_resource(&service_api, &service).await?;
498
499        self.sync_deployment_status(&client, balancer).await?;
500
501        Ok(None)
502    }
503}