Skip to main content

mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
15            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
17            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{
25        apis::meta::v1::{Condition, LabelSelector, Time},
26        util::intstr::IntOrString,
27    },
28    jiff::Timestamp,
29};
30use kube::{
31    Api, Client, Resource, ResourceExt,
32    api::{DeleteParams, ObjectMeta, PostParams},
33    runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
34};
35use maplit::btreemap;
36use serde::Serialize;
37use tracing::{trace, warn};
38
39use crate::{
40    Error,
41    k8s::{apply_resource, get_resource, replace_resource},
42    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
43};
44use mz_cloud_resources::crd::{
45    ManagedResource,
46    console::v1alpha1::{Console, HttpConnectionScheme},
47    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
48};
49use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
50use mz_ore::{cli::KeyValueArg, instrument};
51use mz_server_core::listeners::AuthenticatorKind;
52
53pub struct Config {
54    pub enable_security_context: bool,
55    pub enable_prometheus_scrape_annotations: bool,
56
57    pub image_pull_policy: KubernetesImagePullPolicy,
58    pub scheduler_name: Option<String>,
59    pub console_node_selector: Vec<KeyValueArg<String, String>>,
60    pub console_affinity: Option<Affinity>,
61    pub console_tolerations: Option<Vec<Toleration>>,
62    pub console_default_resources: Option<ResourceRequirements>,
63    pub network_policies_ingress_enabled: bool,
64    pub network_policies_ingress_cidrs: Vec<String>,
65
66    pub default_certificate_specs: DefaultCertificateSpecs,
67
68    pub console_http_port: u16,
69    pub balancerd_http_port: u16,
70}
71
72#[derive(Serialize)]
73struct AppConfig {
74    version: String,
75    auth: AppConfigAuth,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    balancerd_dns_names: Option<Vec<String>>,
78}
79
80#[derive(Serialize)]
81struct AppConfigAuth {
82    mode: AuthenticatorKind,
83}
84
85pub struct Context {
86    config: Config,
87}
88
89impl Context {
90    pub fn new(config: Config) -> Self {
91        Self { config }
92    }
93
94    async fn sync_deployment_status(
95        &self,
96        client: &Client,
97        console: &Console,
98    ) -> Result<(), Error> {
99        let namespace = console.namespace();
100        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
101        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
102
103        let Some(deployment) = get_resource(&deployment_api, &console.deployment_name()).await?
104        else {
105            return Ok(());
106        };
107
108        let Some(deployment_conditions) = &deployment
109            .status
110            .as_ref()
111            .and_then(|status| status.conditions.as_ref())
112        else {
113            // if the deployment doesn't have any conditions set yet, there
114            // is nothing to sync
115            return Ok(());
116        };
117
118        let ready = deployment_conditions
119            .iter()
120            .any(|condition| condition.type_ == "Available" && condition.status == "True");
121        let ready_str = if ready { "True" } else { "False" };
122
123        let mut status = console.status.clone().unwrap();
124        if status
125            .conditions
126            .iter()
127            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
128        {
129            // if the deployment status is already set correctly, we don't
130            // need to set it again (this prevents us from getting stuck in
131            // a reconcile loop)
132            return Ok(());
133        }
134
135        status.conditions = vec![Condition {
136            type_: "Ready".to_string(),
137            status: ready_str.to_string(),
138            last_transition_time: Time(Timestamp::now()),
139            message: format!(
140                "console deployment is{} ready",
141                if ready { "" } else { " not" }
142            ),
143            observed_generation: None,
144            reason: "DeploymentStatus".to_string(),
145        }];
146        let mut new_console = console.clone();
147        new_console.status = Some(status);
148
149        console_api
150            .replace_status(
151                &console.name_unchecked(),
152                &PostParams::default(),
153                &new_console,
154            )
155            .await?;
156
157        Ok(())
158    }
159
160    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
161        let mut network_policies = Vec::new();
162        if self.config.network_policies_ingress_enabled {
163            let console_label_selector = LabelSelector {
164                match_labels: Some(
165                    console
166                        .default_labels()
167                        .into_iter()
168                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
169                        .collect(),
170                ),
171                ..Default::default()
172            };
173            network_policies.extend([NetworkPolicy {
174                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
175                spec: Some(NetworkPolicySpec {
176                    ingress: Some(vec![NetworkPolicyIngressRule {
177                        from: Some(
178                            self.config
179                                .network_policies_ingress_cidrs
180                                .iter()
181                                .map(|cidr| NetworkPolicyPeer {
182                                    ip_block: Some(IPBlock {
183                                        cidr: cidr.to_owned(),
184                                        except: None,
185                                    }),
186                                    ..Default::default()
187                                })
188                                .collect(),
189                        ),
190                        ports: Some(vec![NetworkPolicyPort {
191                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
192                            protocol: Some("TCP".to_string()),
193                            ..Default::default()
194                        }]),
195                        ..Default::default()
196                    }]),
197                    pod_selector: Some(console_label_selector),
198                    policy_types: Some(vec!["Ingress".to_owned()]),
199                    ..Default::default()
200                }),
201            }]);
202        }
203        network_policies
204    }
205
206    fn create_console_external_certificate(
207        &self,
208        console: &Console,
209    ) -> anyhow::Result<Option<Certificate>> {
210        create_certificate(
211            self.config
212                .default_certificate_specs
213                .console_external
214                .clone(),
215            console,
216            console.spec.external_certificate_spec.clone(),
217            console.external_certificate_name(),
218            console.external_certificate_secret_name(),
219            None,
220            CertificatePrivateKeyAlgorithm::Ecdsa,
221            Some(256),
222        )
223    }
224
225    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
226        let balancerd_dns_names = console.spec.balancerd.dns_names.clone();
227        let version: String = console
228            .spec
229            .console_image_ref
230            .rsplitn(2, ':')
231            .next()
232            .expect("at least one chunk, even if empty")
233            .to_owned();
234        let app_config_json = serde_json::to_string(&AppConfig {
235            version,
236            balancerd_dns_names,
237            auth: AppConfigAuth {
238                mode: console.spec.authenticator_kind,
239            },
240        })
241        .expect("known valid");
242        ConfigMap {
243            binary_data: None,
244            data: Some(btreemap! {
245                "app-config.json".to_owned() => app_config_json,
246            }),
247            immutable: None,
248            metadata: console.managed_resource_meta(console.configmap_name()),
249        }
250    }
251
252    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
253        let mut pod_template_labels = console.default_labels();
254        pod_template_labels.insert(
255            "materialize.cloud/name".to_owned(),
256            console.deployment_name(),
257        );
258        pod_template_labels.insert("app".to_owned(), "console".to_string());
259        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
260
261        let ports = vec![ContainerPort {
262            container_port: self.config.console_http_port.into(),
263            name: Some("http".into()),
264            protocol: Some("TCP".into()),
265            ..Default::default()
266        }];
267
268        let scheme = match console.spec.balancerd.scheme {
269            HttpConnectionScheme::Http => "http",
270            HttpConnectionScheme::Https => "https",
271        };
272        let mut env = vec![EnvVar {
273            name: "MZ_ENDPOINT".to_string(),
274            value: Some(format!(
275                "{}://{}.{}.svc.cluster.local:{}",
276                scheme,
277                console.spec.balancerd.service_name,
278                console.spec.balancerd.namespace,
279                self.config.balancerd_http_port,
280            )),
281            ..Default::default()
282        }];
283        let mut volumes = vec![Volume {
284            name: "app-config".to_string(),
285            config_map: Some(ConfigMapVolumeSource {
286                name: console.configmap_name(),
287                default_mode: Some(256),
288                optional: Some(false),
289                items: Some(vec![KeyToPath {
290                    key: "app-config.json".to_string(),
291                    path: "app-config.json".to_string(),
292                    ..Default::default()
293                }]),
294            }),
295            ..Default::default()
296        }];
297        let mut volume_mounts = vec![VolumeMount {
298            name: "app-config".to_string(),
299            mount_path: "/usr/share/nginx/html/app-config".to_string(),
300            ..Default::default()
301        }];
302
303        let scheme = if issuer_ref_defined(
304            &self.config.default_certificate_specs.console_external,
305            &console.spec.external_certificate_spec,
306        ) {
307            volumes.push(Volume {
308                name: "external-certificate".to_owned(),
309                secret: Some(SecretVolumeSource {
310                    default_mode: Some(0o400),
311                    secret_name: Some(console.external_certificate_secret_name()),
312                    items: None,
313                    optional: Some(false),
314                }),
315                ..Default::default()
316            });
317            volume_mounts.push(VolumeMount {
318                name: "external-certificate".to_owned(),
319                mount_path: "/nginx/tls".to_owned(),
320                read_only: Some(true),
321                ..Default::default()
322            });
323            env.push(EnvVar {
324                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
325                value: Some(format!(
326                    "listen {} ssl;
327ssl_certificate /nginx/tls/tls.crt;
328ssl_certificate_key /nginx/tls/tls.key;",
329                    self.config.console_http_port
330                )),
331                ..Default::default()
332            });
333            Some("HTTPS".to_owned())
334        } else {
335            env.push(EnvVar {
336                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
337                value: Some(format!("listen {};", self.config.console_http_port)),
338                ..Default::default()
339            });
340            Some("HTTP".to_owned())
341        };
342
343        let probe = Probe {
344            http_get: Some(HTTPGetAction {
345                path: Some("/".to_string()),
346                port: IntOrString::Int(self.config.console_http_port.into()),
347                scheme,
348                ..Default::default()
349            }),
350            ..Default::default()
351        };
352
353        let security_context = if self.config.enable_security_context {
354            // Since we want to adhere to the most restrictive security context, all
355            // of these fields have to be set how they are.
356            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
357            Some(SecurityContext {
358                run_as_non_root: Some(true),
359                capabilities: Some(Capabilities {
360                    drop: Some(vec!["ALL".to_string()]),
361                    ..Default::default()
362                }),
363                seccomp_profile: Some(SeccompProfile {
364                    type_: "RuntimeDefault".to_string(),
365                    ..Default::default()
366                }),
367                allow_privilege_escalation: Some(false),
368                ..Default::default()
369            })
370        } else {
371            None
372        };
373
374        let container = Container {
375            name: "console".to_owned(),
376            image: Some(console.spec.console_image_ref.clone()),
377            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
378            ports: Some(ports),
379            env: Some(env),
380            startup_probe: Some(Probe {
381                period_seconds: Some(1),
382                failure_threshold: Some(10),
383                ..probe.clone()
384            }),
385            readiness_probe: Some(Probe {
386                period_seconds: Some(30),
387                failure_threshold: Some(1),
388                ..probe.clone()
389            }),
390            liveness_probe: Some(Probe {
391                period_seconds: Some(30),
392                ..probe.clone()
393            }),
394            resources: console
395                .spec
396                .resource_requirements
397                .clone()
398                .or_else(|| self.config.console_default_resources.clone()),
399            security_context,
400            volume_mounts: Some(volume_mounts),
401            ..Default::default()
402        };
403
404        let deployment_spec = DeploymentSpec {
405            replicas: Some(console.replicas()),
406            selector: LabelSelector {
407                match_labels: Some(pod_template_labels.clone()),
408                ..Default::default()
409            },
410            template: PodTemplateSpec {
411                // not using managed_resource_meta because the pod should be
412                // owned by the deployment, not the materialize instance
413                metadata: Some(ObjectMeta {
414                    labels: Some(pod_template_labels),
415                    ..Default::default()
416                }),
417                spec: Some(PodSpec {
418                    containers: vec![container],
419                    node_selector: Some(
420                        self.config
421                            .console_node_selector
422                            .iter()
423                            .map(|selector| (selector.key.clone(), selector.value.clone()))
424                            .collect(),
425                    ),
426                    affinity: self.config.console_affinity.clone(),
427                    tolerations: self.config.console_tolerations.clone(),
428                    scheduler_name: self.config.scheduler_name.clone(),
429                    volumes: Some(volumes),
430                    security_context: Some(PodSecurityContext {
431                        fs_group: Some(101),
432                        ..Default::default()
433                    }),
434                    ..Default::default()
435                }),
436            },
437            ..Default::default()
438        };
439
440        Deployment {
441            metadata: ObjectMeta {
442                ..console.managed_resource_meta(console.deployment_name())
443            },
444            spec: Some(deployment_spec),
445            status: None,
446        }
447    }
448
449    fn create_console_service_object(&self, console: &Console) -> Service {
450        let selector =
451            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
452
453        let ports = vec![ServicePort {
454            name: Some("http".to_string()),
455            protocol: Some("TCP".to_string()),
456            port: self.config.console_http_port.into(),
457            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
458            ..Default::default()
459        }];
460
461        let spec = ServiceSpec {
462            type_: Some("ClusterIP".to_string()),
463            cluster_ip: Some("None".to_string()),
464            selector: Some(selector),
465            ports: Some(ports),
466            ..Default::default()
467        };
468
469        Service {
470            metadata: console.managed_resource_meta(console.service_name()),
471            spec: Some(spec),
472            status: None,
473        }
474    }
475
476    // TODO: remove this once everyone is upgraded to an orchestratord
477    // version with the separate console operator
478    async fn fix_deployment(
479        &self,
480        deployment_api: &Api<Deployment>,
481        new_deployment: &Deployment,
482    ) -> Result<(), Error> {
483        let Some(mut existing_deployment) =
484            get_resource(deployment_api, &new_deployment.name_unchecked()).await?
485        else {
486            return Ok(());
487        };
488
489        if existing_deployment.spec.as_ref().unwrap().selector
490            == new_deployment.spec.as_ref().unwrap().selector
491        {
492            return Ok(());
493        }
494
495        warn!("found existing deployment with old label selector, fixing");
496
497        // this is sufficient because the new labels are a superset of the
498        // old labels, so the existing label selector should still be valid
499        existing_deployment
500            .spec
501            .as_mut()
502            .unwrap()
503            .template
504            .metadata
505            .as_mut()
506            .unwrap()
507            .labels = new_deployment
508            .spec
509            .as_ref()
510            .unwrap()
511            .template
512            .metadata
513            .as_ref()
514            .unwrap()
515            .labels
516            .clone();
517
518        // using await_condition is not ideal in a controller loop, but this
519        // is very temporary and will only ever happen once, so this feels
520        // simpler than trying to introduce an entire state machine here
521        replace_resource(deployment_api, &existing_deployment).await?;
522        await_condition(
523            deployment_api.clone(),
524            &existing_deployment.name_unchecked(),
525            |deployment: Option<&Deployment>| {
526                let observed_generation = deployment
527                    .and_then(|deployment| deployment.status.as_ref())
528                    .and_then(|status| status.observed_generation)
529                    .unwrap_or(0);
530                let current_generation = deployment
531                    .and_then(|deployment| deployment.meta().generation)
532                    .unwrap_or(0);
533                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
534                observed_generation == current_generation
535                    && current_generation > previous_generation
536            },
537        )
538        .await
539        .map_err(|e| anyhow::anyhow!(e))?;
540        await_condition(
541            deployment_api.clone(),
542            &existing_deployment.name_unchecked(),
543            is_deployment_completed(),
544        )
545        .await
546        .map_err(|e| anyhow::anyhow!(e))?;
547
548        // delete the deployment but leave the pods around (via
549        // DeleteParams::orphan)
550        match kube::runtime::wait::delete::delete_and_finalize(
551            deployment_api.clone(),
552            &existing_deployment.name_unchecked(),
553            &DeleteParams::orphan(),
554        )
555        .await
556        {
557            Ok(_) => {}
558            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
559                if e.code == 404 =>
560            {
561                // the resource already doesn't exist
562            }
563            Err(e) => return Err(anyhow::anyhow!(e).into()),
564        }
565
566        // now, the normal apply of the new deployment (in the main loop)
567        // will take over the existing pods from the old deployment we just
568        // deleted, since we already updated the pod labels to be the same as
569        // the new label selector
570
571        Ok(())
572    }
573}
574
575#[async_trait::async_trait]
576impl k8s_controller::Context for Context {
577    type Resource = Console;
578    type Error = Error;
579
580    #[instrument(fields())]
581    async fn apply(
582        &self,
583        client: Client,
584        console: &Self::Resource,
585    ) -> Result<Option<Action>, Self::Error> {
586        if console.status.is_none() {
587            let console_api: Api<Console> =
588                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
589            let mut new_console = console.clone();
590            new_console.status = Some(console.status());
591            console_api
592                .replace_status(
593                    &console.name_unchecked(),
594                    &PostParams::default(),
595                    &new_console,
596                )
597                .await?;
598            // Updating the status should trigger a reconciliation
599            // which will include a status this time.
600            return Ok(None);
601        }
602
603        let namespace = console.namespace();
604        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
605        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
606        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
607        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
608        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
609
610        trace!("creating new network policies");
611        let network_policies = self.create_network_policies(console);
612        for network_policy in &network_policies {
613            apply_resource(&network_policy_api, network_policy).await?;
614        }
615
616        trace!("creating new console configmap");
617        let console_configmap = self.create_console_app_configmap_object(console);
618        apply_resource(&configmap_api, &console_configmap).await?;
619
620        trace!("creating new console deployment");
621        let console_deployment = self.create_console_deployment_object(console);
622        self.fix_deployment(&deployment_api, &console_deployment)
623            .await?;
624        apply_resource(&deployment_api, &console_deployment).await?;
625
626        trace!("creating new console service");
627        let console_service = self.create_console_service_object(console);
628        apply_resource(&service_api, &console_service).await?;
629
630        let console_external_certificate = self.create_console_external_certificate(console)?;
631        if let Some(certificate) = &console_external_certificate {
632            trace!("creating new console external certificate");
633            apply_resource(&certificate_api, certificate).await?;
634        }
635
636        self.sync_deployment_status(&client, console).await?;
637
638        Ok(None)
639    }
640}