Skip to main content

mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec},
15        core::v1::{
16            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
17            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
18            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
19            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
20        },
21        networking::v1::{
22            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
23            NetworkPolicySpec,
24        },
25    },
26    apimachinery::pkg::{
27        apis::meta::v1::{Condition, LabelSelector, Time},
28        util::intstr::IntOrString,
29    },
30    jiff::Timestamp,
31};
32use kube::{
33    Api, Client, Resource, ResourceExt,
34    api::{DeleteParams, ObjectMeta, PostParams},
35    runtime::{
36        conditions::is_deployment_completed,
37        controller::Action,
38        reflector::{ObjectRef, Store},
39        wait::await_condition,
40    },
41};
42use maplit::btreemap;
43use serde::Serialize;
44use tracing::{trace, warn};
45
46use crate::{
47    Error,
48    k8s::{apply_resource, make_reflector, replace_resource},
49    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
50};
51use mz_cloud_resources::crd::{
52    ManagedResource,
53    console::v1alpha1::{Console, HttpConnectionScheme},
54    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
55};
56use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
57use mz_ore::{cli::KeyValueArg, instrument};
58use mz_server_core::listeners::AuthenticatorKind;
59
60pub struct Config {
61    pub enable_security_context: bool,
62    pub enable_prometheus_scrape_annotations: bool,
63
64    pub image_pull_policy: KubernetesImagePullPolicy,
65    pub scheduler_name: Option<String>,
66    pub console_node_selector: Vec<KeyValueArg<String, String>>,
67    pub console_affinity: Option<Affinity>,
68    pub console_tolerations: Option<Vec<Toleration>>,
69    pub console_default_resources: Option<ResourceRequirements>,
70    pub network_policies_ingress_enabled: bool,
71    pub network_policies_ingress_cidrs: Vec<String>,
72
73    pub default_certificate_specs: DefaultCertificateSpecs,
74
75    pub console_http_port: u16,
76    pub balancerd_http_port: u16,
77}
78
79#[derive(Serialize)]
80struct AppConfig {
81    version: String,
82    auth: AppConfigAuth,
83}
84
85#[derive(Serialize)]
86struct AppConfigAuth {
87    mode: AuthenticatorKind,
88}
89
90pub struct Context {
91    config: Config,
92    deployments: Store<Deployment>,
93}
94
95impl Context {
96    pub async fn new(config: Config, client: Client) -> Self {
97        Self {
98            config,
99            deployments: make_reflector(client).await,
100        }
101    }
102
103    async fn sync_deployment_status(
104        &self,
105        client: &Client,
106        console: &Console,
107    ) -> Result<(), kube::Error> {
108        let namespace = console.namespace();
109        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
110
111        let Some(deployment) = self
112            .deployments
113            .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
114        else {
115            return Ok(());
116        };
117
118        let Some(deployment_conditions) = &deployment
119            .status
120            .as_ref()
121            .and_then(|status| status.conditions.as_ref())
122        else {
123            // if the deployment doesn't have any conditions set yet, there
124            // is nothing to sync
125            return Ok(());
126        };
127
128        let ready = deployment_conditions
129            .iter()
130            .any(|condition| condition.type_ == "Available" && condition.status == "True");
131        let ready_str = if ready { "True" } else { "False" };
132
133        let mut status = console.status.clone().unwrap();
134        if status
135            .conditions
136            .iter()
137            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
138        {
139            // if the deployment status is already set correctly, we don't
140            // need to set it again (this prevents us from getting stuck in
141            // a reconcile loop)
142            return Ok(());
143        }
144
145        status.conditions = vec![Condition {
146            type_: "Ready".to_string(),
147            status: ready_str.to_string(),
148            last_transition_time: Time(Timestamp::now()),
149            message: format!(
150                "console deployment is{} ready",
151                if ready { "" } else { " not" }
152            ),
153            observed_generation: None,
154            reason: "DeploymentStatus".to_string(),
155        }];
156        let mut new_console = console.clone();
157        new_console.status = Some(status);
158
159        console_api
160            .replace_status(
161                &console.name_unchecked(),
162                &PostParams::default(),
163                &new_console,
164            )
165            .await?;
166
167        Ok(())
168    }
169
170    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
171        let mut network_policies = Vec::new();
172        if self.config.network_policies_ingress_enabled {
173            let console_label_selector = LabelSelector {
174                match_labels: Some(
175                    console
176                        .default_labels()
177                        .into_iter()
178                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
179                        .collect(),
180                ),
181                ..Default::default()
182            };
183            network_policies.extend([NetworkPolicy {
184                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
185                spec: Some(NetworkPolicySpec {
186                    ingress: Some(vec![NetworkPolicyIngressRule {
187                        from: Some(
188                            self.config
189                                .network_policies_ingress_cidrs
190                                .iter()
191                                .map(|cidr| NetworkPolicyPeer {
192                                    ip_block: Some(IPBlock {
193                                        cidr: cidr.to_owned(),
194                                        except: None,
195                                    }),
196                                    ..Default::default()
197                                })
198                                .collect(),
199                        ),
200                        ports: Some(vec![NetworkPolicyPort {
201                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
202                            protocol: Some("TCP".to_string()),
203                            ..Default::default()
204                        }]),
205                        ..Default::default()
206                    }]),
207                    pod_selector: Some(console_label_selector),
208                    policy_types: Some(vec!["Ingress".to_owned()]),
209                    ..Default::default()
210                }),
211            }]);
212        }
213        network_policies
214    }
215
216    fn create_console_external_certificate(
217        &self,
218        console: &Console,
219    ) -> anyhow::Result<Option<Certificate>> {
220        create_certificate(
221            self.config
222                .default_certificate_specs
223                .console_external
224                .clone(),
225            console,
226            console.spec.external_certificate_spec.clone(),
227            console.external_certificate_name(),
228            console.external_certificate_secret_name(),
229            None,
230            CertificatePrivateKeyAlgorithm::Ecdsa,
231            Some(256),
232        )
233    }
234
235    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
236        let version: String = console
237            .spec
238            .console_image_ref
239            .rsplitn(2, ':')
240            .next()
241            .expect("at least one chunk, even if empty")
242            .to_owned();
243        let app_config_json = serde_json::to_string(&AppConfig {
244            version,
245            auth: AppConfigAuth {
246                mode: console.spec.authenticator_kind,
247            },
248        })
249        .expect("known valid");
250        ConfigMap {
251            binary_data: None,
252            data: Some(btreemap! {
253                "app-config.json".to_owned() => app_config_json,
254            }),
255            immutable: None,
256            metadata: console.managed_resource_meta(console.configmap_name()),
257        }
258    }
259
260    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
261        let mut pod_template_labels = console.default_labels();
262        pod_template_labels.insert(
263            "materialize.cloud/name".to_owned(),
264            console.deployment_name(),
265        );
266        pod_template_labels.insert("app".to_owned(), "console".to_string());
267        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
268
269        let ports = vec![ContainerPort {
270            container_port: self.config.console_http_port.into(),
271            name: Some("http".into()),
272            protocol: Some("TCP".into()),
273            ..Default::default()
274        }];
275
276        let scheme = match console.spec.balancerd.scheme {
277            HttpConnectionScheme::Http => "http",
278            HttpConnectionScheme::Https => "https",
279        };
280        let mut env = vec![EnvVar {
281            name: "MZ_ENDPOINT".to_string(),
282            value: Some(format!(
283                "{}://{}.{}.svc.cluster.local:{}",
284                scheme,
285                console.spec.balancerd.service_name,
286                console.spec.balancerd.namespace,
287                self.config.balancerd_http_port,
288            )),
289            ..Default::default()
290        }];
291        let mut volumes = vec![Volume {
292            name: "app-config".to_string(),
293            config_map: Some(ConfigMapVolumeSource {
294                name: console.configmap_name(),
295                default_mode: Some(256),
296                optional: Some(false),
297                items: Some(vec![KeyToPath {
298                    key: "app-config.json".to_string(),
299                    path: "app-config.json".to_string(),
300                    ..Default::default()
301                }]),
302            }),
303            ..Default::default()
304        }];
305        let mut volume_mounts = vec![VolumeMount {
306            name: "app-config".to_string(),
307            mount_path: "/usr/share/nginx/html/app-config".to_string(),
308            ..Default::default()
309        }];
310
311        let scheme = if issuer_ref_defined(
312            &self.config.default_certificate_specs.console_external,
313            &console.spec.external_certificate_spec,
314        ) {
315            volumes.push(Volume {
316                name: "external-certificate".to_owned(),
317                secret: Some(SecretVolumeSource {
318                    default_mode: Some(0o400),
319                    secret_name: Some(console.external_certificate_secret_name()),
320                    items: None,
321                    optional: Some(false),
322                }),
323                ..Default::default()
324            });
325            volume_mounts.push(VolumeMount {
326                name: "external-certificate".to_owned(),
327                mount_path: "/nginx/tls".to_owned(),
328                read_only: Some(true),
329                ..Default::default()
330            });
331            env.push(EnvVar {
332                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
333                value: Some(format!(
334                    "listen {} ssl;
335ssl_certificate /nginx/tls/tls.crt;
336ssl_certificate_key /nginx/tls/tls.key;",
337                    self.config.console_http_port
338                )),
339                ..Default::default()
340            });
341            Some("HTTPS".to_owned())
342        } else {
343            env.push(EnvVar {
344                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
345                value: Some(format!("listen {};", self.config.console_http_port)),
346                ..Default::default()
347            });
348            Some("HTTP".to_owned())
349        };
350
351        let probe = Probe {
352            http_get: Some(HTTPGetAction {
353                path: Some("/".to_string()),
354                port: IntOrString::Int(self.config.console_http_port.into()),
355                scheme,
356                ..Default::default()
357            }),
358            ..Default::default()
359        };
360
361        let security_context = if self.config.enable_security_context {
362            // Since we want to adhere to the most restrictive security context, all
363            // of these fields have to be set how they are.
364            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
365            Some(SecurityContext {
366                run_as_non_root: Some(true),
367                capabilities: Some(Capabilities {
368                    drop: Some(vec!["ALL".to_string()]),
369                    ..Default::default()
370                }),
371                seccomp_profile: Some(SeccompProfile {
372                    type_: "RuntimeDefault".to_string(),
373                    ..Default::default()
374                }),
375                allow_privilege_escalation: Some(false),
376                ..Default::default()
377            })
378        } else {
379            None
380        };
381
382        let container = Container {
383            name: "console".to_owned(),
384            image: Some(console.spec.console_image_ref.clone()),
385            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
386            ports: Some(ports),
387            env: Some(env),
388            startup_probe: Some(Probe {
389                period_seconds: Some(1),
390                failure_threshold: Some(10),
391                ..probe.clone()
392            }),
393            readiness_probe: Some(Probe {
394                period_seconds: Some(30),
395                failure_threshold: Some(1),
396                ..probe.clone()
397            }),
398            liveness_probe: Some(Probe {
399                period_seconds: Some(30),
400                ..probe.clone()
401            }),
402            resources: console
403                .spec
404                .resource_requirements
405                .clone()
406                .or_else(|| self.config.console_default_resources.clone()),
407            security_context,
408            volume_mounts: Some(volume_mounts),
409            ..Default::default()
410        };
411
412        let deployment_spec = DeploymentSpec {
413            replicas: Some(console.replicas()),
414            selector: LabelSelector {
415                match_labels: Some(pod_template_labels.clone()),
416                ..Default::default()
417            },
418            template: PodTemplateSpec {
419                // not using managed_resource_meta because the pod should be
420                // owned by the deployment, not the materialize instance
421                metadata: Some(ObjectMeta {
422                    labels: Some(pod_template_labels),
423                    ..Default::default()
424                }),
425                spec: Some(PodSpec {
426                    containers: vec![container],
427                    node_selector: Some(
428                        self.config
429                            .console_node_selector
430                            .iter()
431                            .map(|selector| (selector.key.clone(), selector.value.clone()))
432                            .collect(),
433                    ),
434                    affinity: self.config.console_affinity.clone(),
435                    tolerations: self.config.console_tolerations.clone(),
436                    scheduler_name: self.config.scheduler_name.clone(),
437                    volumes: Some(volumes),
438                    security_context: Some(PodSecurityContext {
439                        fs_group: Some(101),
440                        ..Default::default()
441                    }),
442                    ..Default::default()
443                }),
444            },
445            ..Default::default()
446        };
447
448        Deployment {
449            metadata: ObjectMeta {
450                ..console.managed_resource_meta(console.deployment_name())
451            },
452            spec: Some(deployment_spec),
453            status: None,
454        }
455    }
456
457    fn create_console_service_object(&self, console: &Console) -> Service {
458        let selector =
459            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
460
461        let ports = vec![ServicePort {
462            name: Some("http".to_string()),
463            protocol: Some("TCP".to_string()),
464            port: self.config.console_http_port.into(),
465            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
466            ..Default::default()
467        }];
468
469        let spec = ServiceSpec {
470            type_: Some("ClusterIP".to_string()),
471            cluster_ip: Some("None".to_string()),
472            selector: Some(selector),
473            ports: Some(ports),
474            ..Default::default()
475        };
476
477        Service {
478            metadata: console.managed_resource_meta(console.service_name()),
479            spec: Some(spec),
480            status: None,
481        }
482    }
483
484    // TODO: remove this once everyone is upgraded to an orchestratord
485    // version with the separate console operator
486    async fn fix_deployment(
487        &self,
488        deployment_api: &Api<Deployment>,
489        new_deployment: &Deployment,
490    ) -> Result<(), Error> {
491        let Some(mut existing_deployment) = self
492            .deployments
493            .get(
494                &ObjectRef::new(&new_deployment.name_unchecked())
495                    .within(&new_deployment.namespace().unwrap()),
496            )
497            .map(Arc::unwrap_or_clone)
498        else {
499            return Ok(());
500        };
501
502        if existing_deployment.spec.as_ref().unwrap().selector
503            == new_deployment.spec.as_ref().unwrap().selector
504        {
505            return Ok(());
506        }
507
508        warn!("found existing deployment with old label selector, fixing");
509
510        // this is sufficient because the new labels are a superset of the
511        // old labels, so the existing label selector should still be valid
512        existing_deployment
513            .spec
514            .as_mut()
515            .unwrap()
516            .template
517            .metadata
518            .as_mut()
519            .unwrap()
520            .labels = new_deployment
521            .spec
522            .as_ref()
523            .unwrap()
524            .template
525            .metadata
526            .as_ref()
527            .unwrap()
528            .labels
529            .clone();
530
531        // using await_condition is not ideal in a controller loop, but this
532        // is very temporary and will only ever happen once, so this feels
533        // simpler than trying to introduce an entire state machine here
534        replace_resource(deployment_api, &existing_deployment).await?;
535        await_condition(
536            deployment_api.clone(),
537            &existing_deployment.name_unchecked(),
538            |deployment: Option<&Deployment>| {
539                let observed_generation = deployment
540                    .and_then(|deployment| deployment.status.as_ref())
541                    .and_then(|status| status.observed_generation)
542                    .unwrap_or(0);
543                let current_generation = deployment
544                    .and_then(|deployment| deployment.meta().generation)
545                    .unwrap_or(0);
546                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
547                observed_generation == current_generation
548                    && current_generation > previous_generation
549            },
550        )
551        .await
552        .map_err(|e| anyhow::anyhow!(e))?;
553        await_condition(
554            deployment_api.clone(),
555            &existing_deployment.name_unchecked(),
556            is_deployment_completed(),
557        )
558        .await
559        .map_err(|e| anyhow::anyhow!(e))?;
560
561        // delete the deployment but leave the pods around (via
562        // DeleteParams::orphan)
563        match kube::runtime::wait::delete::delete_and_finalize(
564            deployment_api.clone(),
565            &existing_deployment.name_unchecked(),
566            &DeleteParams::orphan(),
567        )
568        .await
569        {
570            Ok(_) => {}
571            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
572                if e.code == 404 =>
573            {
574                // the resource already doesn't exist
575            }
576            Err(e) => return Err(anyhow::anyhow!(e).into()),
577        }
578
579        // now, the normal apply of the new deployment (in the main loop)
580        // will take over the existing pods from the old deployment we just
581        // deleted, since we already updated the pod labels to be the same as
582        // the new label selector
583
584        Ok(())
585    }
586}
587
588#[async_trait::async_trait]
589impl k8s_controller::Context for Context {
590    type Resource = Console;
591    type Error = Error;
592
593    #[instrument(fields())]
594    async fn apply(
595        &self,
596        client: Client,
597        console: &Self::Resource,
598    ) -> Result<Option<Action>, Self::Error> {
599        if console.status.is_none() {
600            let console_api: Api<Console> =
601                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
602            let mut new_console = console.clone();
603            new_console.status = Some(console.status());
604            console_api
605                .replace_status(
606                    &console.name_unchecked(),
607                    &PostParams::default(),
608                    &new_console,
609                )
610                .await?;
611            // Updating the status should trigger a reconciliation
612            // which will include a status this time.
613            return Ok(None);
614        }
615
616        let namespace = console.namespace();
617        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
618        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
619        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
620        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
621        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
622
623        trace!("creating new network policies");
624        let network_policies = self.create_network_policies(console);
625        for network_policy in &network_policies {
626            apply_resource(&network_policy_api, network_policy).await?;
627        }
628
629        trace!("creating new console configmap");
630        let console_configmap = self.create_console_app_configmap_object(console);
631        apply_resource(&configmap_api, &console_configmap).await?;
632
633        trace!("creating new console deployment");
634        let console_deployment = self.create_console_deployment_object(console);
635        self.fix_deployment(&deployment_api, &console_deployment)
636            .await?;
637        apply_resource(&deployment_api, &console_deployment).await?;
638
639        trace!("creating new console service");
640        let console_service = self.create_console_service_object(console);
641        apply_resource(&service_api, &console_service).await?;
642
643        let console_external_certificate = self.create_console_external_certificate(console)?;
644        if let Some(certificate) = &console_external_certificate {
645            trace!("creating new console external certificate");
646            apply_resource(&certificate_api, certificate).await?;
647        }
648
649        self.sync_deployment_status(&client, console).await?;
650
651        Ok(None)
652    }
653}