Skip to main content

mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
15            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
17            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{
25        apis::meta::v1::{Condition, LabelSelector, Time},
26        util::intstr::IntOrString,
27    },
28    jiff::Timestamp,
29};
30use kube::{
31    Api, Client, Resource, ResourceExt,
32    api::{DeleteParams, ObjectMeta, PostParams},
33    runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
34};
35use maplit::btreemap;
36use serde::Serialize;
37use tracing::{trace, warn};
38
39use crate::{
40    Error,
41    k8s::{apply_resource, get_resource, replace_resource},
42    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
43};
44use mz_cloud_resources::crd::{
45    ManagedResource,
46    console::v1alpha1::{Console, HttpConnectionScheme},
47    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
48};
49use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
50use mz_ore::{cli::KeyValueArg, instrument};
51use mz_server_core::listeners::AuthenticatorKind;
52
53pub struct Config {
54    pub enable_security_context: bool,
55    pub enable_prometheus_scrape_annotations: bool,
56
57    pub image_pull_policy: KubernetesImagePullPolicy,
58    pub scheduler_name: Option<String>,
59    pub console_node_selector: Vec<KeyValueArg<String, String>>,
60    pub console_affinity: Option<Affinity>,
61    pub console_tolerations: Option<Vec<Toleration>>,
62    pub console_default_resources: Option<ResourceRequirements>,
63    pub network_policies_ingress_enabled: bool,
64    pub network_policies_ingress_cidrs: Vec<String>,
65
66    pub default_certificate_specs: DefaultCertificateSpecs,
67
68    pub console_http_port: u16,
69    pub balancerd_http_port: u16,
70}
71
72#[derive(Serialize)]
73struct AppConfig {
74    version: String,
75    auth: AppConfigAuth,
76}
77
78#[derive(Serialize)]
79struct AppConfigAuth {
80    mode: AuthenticatorKind,
81}
82
83pub struct Context {
84    config: Config,
85}
86
87impl Context {
88    pub fn new(config: Config) -> Self {
89        Self { config }
90    }
91
92    async fn sync_deployment_status(
93        &self,
94        client: &Client,
95        console: &Console,
96    ) -> Result<(), Error> {
97        let namespace = console.namespace();
98        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
99        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
100
101        let Some(deployment) = get_resource(&deployment_api, &console.deployment_name()).await?
102        else {
103            return Ok(());
104        };
105
106        let Some(deployment_conditions) = &deployment
107            .status
108            .as_ref()
109            .and_then(|status| status.conditions.as_ref())
110        else {
111            // if the deployment doesn't have any conditions set yet, there
112            // is nothing to sync
113            return Ok(());
114        };
115
116        let ready = deployment_conditions
117            .iter()
118            .any(|condition| condition.type_ == "Available" && condition.status == "True");
119        let ready_str = if ready { "True" } else { "False" };
120
121        let mut status = console.status.clone().unwrap();
122        if status
123            .conditions
124            .iter()
125            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
126        {
127            // if the deployment status is already set correctly, we don't
128            // need to set it again (this prevents us from getting stuck in
129            // a reconcile loop)
130            return Ok(());
131        }
132
133        status.conditions = vec![Condition {
134            type_: "Ready".to_string(),
135            status: ready_str.to_string(),
136            last_transition_time: Time(Timestamp::now()),
137            message: format!(
138                "console deployment is{} ready",
139                if ready { "" } else { " not" }
140            ),
141            observed_generation: None,
142            reason: "DeploymentStatus".to_string(),
143        }];
144        let mut new_console = console.clone();
145        new_console.status = Some(status);
146
147        console_api
148            .replace_status(
149                &console.name_unchecked(),
150                &PostParams::default(),
151                &new_console,
152            )
153            .await?;
154
155        Ok(())
156    }
157
158    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
159        let mut network_policies = Vec::new();
160        if self.config.network_policies_ingress_enabled {
161            let console_label_selector = LabelSelector {
162                match_labels: Some(
163                    console
164                        .default_labels()
165                        .into_iter()
166                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
167                        .collect(),
168                ),
169                ..Default::default()
170            };
171            network_policies.extend([NetworkPolicy {
172                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
173                spec: Some(NetworkPolicySpec {
174                    ingress: Some(vec![NetworkPolicyIngressRule {
175                        from: Some(
176                            self.config
177                                .network_policies_ingress_cidrs
178                                .iter()
179                                .map(|cidr| NetworkPolicyPeer {
180                                    ip_block: Some(IPBlock {
181                                        cidr: cidr.to_owned(),
182                                        except: None,
183                                    }),
184                                    ..Default::default()
185                                })
186                                .collect(),
187                        ),
188                        ports: Some(vec![NetworkPolicyPort {
189                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
190                            protocol: Some("TCP".to_string()),
191                            ..Default::default()
192                        }]),
193                        ..Default::default()
194                    }]),
195                    pod_selector: Some(console_label_selector),
196                    policy_types: Some(vec!["Ingress".to_owned()]),
197                    ..Default::default()
198                }),
199            }]);
200        }
201        network_policies
202    }
203
204    fn create_console_external_certificate(
205        &self,
206        console: &Console,
207    ) -> anyhow::Result<Option<Certificate>> {
208        create_certificate(
209            self.config
210                .default_certificate_specs
211                .console_external
212                .clone(),
213            console,
214            console.spec.external_certificate_spec.clone(),
215            console.external_certificate_name(),
216            console.external_certificate_secret_name(),
217            None,
218            CertificatePrivateKeyAlgorithm::Ecdsa,
219            Some(256),
220        )
221    }
222
223    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
224        let version: String = console
225            .spec
226            .console_image_ref
227            .rsplitn(2, ':')
228            .next()
229            .expect("at least one chunk, even if empty")
230            .to_owned();
231        let app_config_json = serde_json::to_string(&AppConfig {
232            version,
233            auth: AppConfigAuth {
234                mode: console.spec.authenticator_kind,
235            },
236        })
237        .expect("known valid");
238        ConfigMap {
239            binary_data: None,
240            data: Some(btreemap! {
241                "app-config.json".to_owned() => app_config_json,
242            }),
243            immutable: None,
244            metadata: console.managed_resource_meta(console.configmap_name()),
245        }
246    }
247
248    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
249        let mut pod_template_labels = console.default_labels();
250        pod_template_labels.insert(
251            "materialize.cloud/name".to_owned(),
252            console.deployment_name(),
253        );
254        pod_template_labels.insert("app".to_owned(), "console".to_string());
255        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
256
257        let ports = vec![ContainerPort {
258            container_port: self.config.console_http_port.into(),
259            name: Some("http".into()),
260            protocol: Some("TCP".into()),
261            ..Default::default()
262        }];
263
264        let scheme = match console.spec.balancerd.scheme {
265            HttpConnectionScheme::Http => "http",
266            HttpConnectionScheme::Https => "https",
267        };
268        let mut env = vec![EnvVar {
269            name: "MZ_ENDPOINT".to_string(),
270            value: Some(format!(
271                "{}://{}.{}.svc.cluster.local:{}",
272                scheme,
273                console.spec.balancerd.service_name,
274                console.spec.balancerd.namespace,
275                self.config.balancerd_http_port,
276            )),
277            ..Default::default()
278        }];
279        let mut volumes = vec![Volume {
280            name: "app-config".to_string(),
281            config_map: Some(ConfigMapVolumeSource {
282                name: console.configmap_name(),
283                default_mode: Some(256),
284                optional: Some(false),
285                items: Some(vec![KeyToPath {
286                    key: "app-config.json".to_string(),
287                    path: "app-config.json".to_string(),
288                    ..Default::default()
289                }]),
290            }),
291            ..Default::default()
292        }];
293        let mut volume_mounts = vec![VolumeMount {
294            name: "app-config".to_string(),
295            mount_path: "/usr/share/nginx/html/app-config".to_string(),
296            ..Default::default()
297        }];
298
299        let scheme = if issuer_ref_defined(
300            &self.config.default_certificate_specs.console_external,
301            &console.spec.external_certificate_spec,
302        ) {
303            volumes.push(Volume {
304                name: "external-certificate".to_owned(),
305                secret: Some(SecretVolumeSource {
306                    default_mode: Some(0o400),
307                    secret_name: Some(console.external_certificate_secret_name()),
308                    items: None,
309                    optional: Some(false),
310                }),
311                ..Default::default()
312            });
313            volume_mounts.push(VolumeMount {
314                name: "external-certificate".to_owned(),
315                mount_path: "/nginx/tls".to_owned(),
316                read_only: Some(true),
317                ..Default::default()
318            });
319            env.push(EnvVar {
320                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
321                value: Some(format!(
322                    "listen {} ssl;
323ssl_certificate /nginx/tls/tls.crt;
324ssl_certificate_key /nginx/tls/tls.key;",
325                    self.config.console_http_port
326                )),
327                ..Default::default()
328            });
329            Some("HTTPS".to_owned())
330        } else {
331            env.push(EnvVar {
332                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
333                value: Some(format!("listen {};", self.config.console_http_port)),
334                ..Default::default()
335            });
336            Some("HTTP".to_owned())
337        };
338
339        let probe = Probe {
340            http_get: Some(HTTPGetAction {
341                path: Some("/".to_string()),
342                port: IntOrString::Int(self.config.console_http_port.into()),
343                scheme,
344                ..Default::default()
345            }),
346            ..Default::default()
347        };
348
349        let security_context = if self.config.enable_security_context {
350            // Since we want to adhere to the most restrictive security context, all
351            // of these fields have to be set how they are.
352            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
353            Some(SecurityContext {
354                run_as_non_root: Some(true),
355                capabilities: Some(Capabilities {
356                    drop: Some(vec!["ALL".to_string()]),
357                    ..Default::default()
358                }),
359                seccomp_profile: Some(SeccompProfile {
360                    type_: "RuntimeDefault".to_string(),
361                    ..Default::default()
362                }),
363                allow_privilege_escalation: Some(false),
364                ..Default::default()
365            })
366        } else {
367            None
368        };
369
370        let container = Container {
371            name: "console".to_owned(),
372            image: Some(console.spec.console_image_ref.clone()),
373            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
374            ports: Some(ports),
375            env: Some(env),
376            startup_probe: Some(Probe {
377                period_seconds: Some(1),
378                failure_threshold: Some(10),
379                ..probe.clone()
380            }),
381            readiness_probe: Some(Probe {
382                period_seconds: Some(30),
383                failure_threshold: Some(1),
384                ..probe.clone()
385            }),
386            liveness_probe: Some(Probe {
387                period_seconds: Some(30),
388                ..probe.clone()
389            }),
390            resources: console
391                .spec
392                .resource_requirements
393                .clone()
394                .or_else(|| self.config.console_default_resources.clone()),
395            security_context,
396            volume_mounts: Some(volume_mounts),
397            ..Default::default()
398        };
399
400        let deployment_spec = DeploymentSpec {
401            replicas: Some(console.replicas()),
402            selector: LabelSelector {
403                match_labels: Some(pod_template_labels.clone()),
404                ..Default::default()
405            },
406            template: PodTemplateSpec {
407                // not using managed_resource_meta because the pod should be
408                // owned by the deployment, not the materialize instance
409                metadata: Some(ObjectMeta {
410                    labels: Some(pod_template_labels),
411                    ..Default::default()
412                }),
413                spec: Some(PodSpec {
414                    containers: vec![container],
415                    node_selector: Some(
416                        self.config
417                            .console_node_selector
418                            .iter()
419                            .map(|selector| (selector.key.clone(), selector.value.clone()))
420                            .collect(),
421                    ),
422                    affinity: self.config.console_affinity.clone(),
423                    tolerations: self.config.console_tolerations.clone(),
424                    scheduler_name: self.config.scheduler_name.clone(),
425                    volumes: Some(volumes),
426                    security_context: Some(PodSecurityContext {
427                        fs_group: Some(101),
428                        ..Default::default()
429                    }),
430                    ..Default::default()
431                }),
432            },
433            ..Default::default()
434        };
435
436        Deployment {
437            metadata: ObjectMeta {
438                ..console.managed_resource_meta(console.deployment_name())
439            },
440            spec: Some(deployment_spec),
441            status: None,
442        }
443    }
444
445    fn create_console_service_object(&self, console: &Console) -> Service {
446        let selector =
447            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
448
449        let ports = vec![ServicePort {
450            name: Some("http".to_string()),
451            protocol: Some("TCP".to_string()),
452            port: self.config.console_http_port.into(),
453            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
454            ..Default::default()
455        }];
456
457        let spec = ServiceSpec {
458            type_: Some("ClusterIP".to_string()),
459            cluster_ip: Some("None".to_string()),
460            selector: Some(selector),
461            ports: Some(ports),
462            ..Default::default()
463        };
464
465        Service {
466            metadata: console.managed_resource_meta(console.service_name()),
467            spec: Some(spec),
468            status: None,
469        }
470    }
471
472    // TODO: remove this once everyone is upgraded to an orchestratord
473    // version with the separate console operator
474    async fn fix_deployment(
475        &self,
476        deployment_api: &Api<Deployment>,
477        new_deployment: &Deployment,
478    ) -> Result<(), Error> {
479        let Some(mut existing_deployment) =
480            get_resource(deployment_api, &new_deployment.name_unchecked()).await?
481        else {
482            return Ok(());
483        };
484
485        if existing_deployment.spec.as_ref().unwrap().selector
486            == new_deployment.spec.as_ref().unwrap().selector
487        {
488            return Ok(());
489        }
490
491        warn!("found existing deployment with old label selector, fixing");
492
493        // this is sufficient because the new labels are a superset of the
494        // old labels, so the existing label selector should still be valid
495        existing_deployment
496            .spec
497            .as_mut()
498            .unwrap()
499            .template
500            .metadata
501            .as_mut()
502            .unwrap()
503            .labels = new_deployment
504            .spec
505            .as_ref()
506            .unwrap()
507            .template
508            .metadata
509            .as_ref()
510            .unwrap()
511            .labels
512            .clone();
513
514        // using await_condition is not ideal in a controller loop, but this
515        // is very temporary and will only ever happen once, so this feels
516        // simpler than trying to introduce an entire state machine here
517        replace_resource(deployment_api, &existing_deployment).await?;
518        await_condition(
519            deployment_api.clone(),
520            &existing_deployment.name_unchecked(),
521            |deployment: Option<&Deployment>| {
522                let observed_generation = deployment
523                    .and_then(|deployment| deployment.status.as_ref())
524                    .and_then(|status| status.observed_generation)
525                    .unwrap_or(0);
526                let current_generation = deployment
527                    .and_then(|deployment| deployment.meta().generation)
528                    .unwrap_or(0);
529                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
530                observed_generation == current_generation
531                    && current_generation > previous_generation
532            },
533        )
534        .await
535        .map_err(|e| anyhow::anyhow!(e))?;
536        await_condition(
537            deployment_api.clone(),
538            &existing_deployment.name_unchecked(),
539            is_deployment_completed(),
540        )
541        .await
542        .map_err(|e| anyhow::anyhow!(e))?;
543
544        // delete the deployment but leave the pods around (via
545        // DeleteParams::orphan)
546        match kube::runtime::wait::delete::delete_and_finalize(
547            deployment_api.clone(),
548            &existing_deployment.name_unchecked(),
549            &DeleteParams::orphan(),
550        )
551        .await
552        {
553            Ok(_) => {}
554            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
555                if e.code == 404 =>
556            {
557                // the resource already doesn't exist
558            }
559            Err(e) => return Err(anyhow::anyhow!(e).into()),
560        }
561
562        // now, the normal apply of the new deployment (in the main loop)
563        // will take over the existing pods from the old deployment we just
564        // deleted, since we already updated the pod labels to be the same as
565        // the new label selector
566
567        Ok(())
568    }
569}
570
571#[async_trait::async_trait]
572impl k8s_controller::Context for Context {
573    type Resource = Console;
574    type Error = Error;
575
576    #[instrument(fields())]
577    async fn apply(
578        &self,
579        client: Client,
580        console: &Self::Resource,
581    ) -> Result<Option<Action>, Self::Error> {
582        if console.status.is_none() {
583            let console_api: Api<Console> =
584                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
585            let mut new_console = console.clone();
586            new_console.status = Some(console.status());
587            console_api
588                .replace_status(
589                    &console.name_unchecked(),
590                    &PostParams::default(),
591                    &new_console,
592                )
593                .await?;
594            // Updating the status should trigger a reconciliation
595            // which will include a status this time.
596            return Ok(None);
597        }
598
599        let namespace = console.namespace();
600        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
601        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
602        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
603        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
604        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
605
606        trace!("creating new network policies");
607        let network_policies = self.create_network_policies(console);
608        for network_policy in &network_policies {
609            apply_resource(&network_policy_api, network_policy).await?;
610        }
611
612        trace!("creating new console configmap");
613        let console_configmap = self.create_console_app_configmap_object(console);
614        apply_resource(&configmap_api, &console_configmap).await?;
615
616        trace!("creating new console deployment");
617        let console_deployment = self.create_console_deployment_object(console);
618        self.fix_deployment(&deployment_api, &console_deployment)
619            .await?;
620        apply_resource(&deployment_api, &console_deployment).await?;
621
622        trace!("creating new console service");
623        let console_service = self.create_console_service_object(console);
624        apply_resource(&service_api, &console_service).await?;
625
626        let console_external_certificate = self.create_console_external_certificate(console)?;
627        if let Some(certificate) = &console_external_certificate {
628            trace!("creating new console external certificate");
629            apply_resource(&certificate_api, certificate).await?;
630        }
631
632        self.sync_deployment_status(&client, console).await?;
633
634        Ok(None)
635    }
636}