mz_orchestratord/controller/materialize/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15            HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17            Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33    controller::materialize::tls::{create_certificate, issuer_ref_defined},
34    k8s::{apply_resource, delete_resource},
35};
36use mz_cloud_resources::crd::{
37    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
38    materialize::v1alpha1::Materialize,
39};
40
41pub struct Resources {
42    network_policies: Vec<NetworkPolicy>,
43    console_configmap: Box<ConfigMap>,
44    console_deployment: Box<Deployment>,
45    console_service: Box<Service>,
46    console_external_certificate: Box<Option<Certificate>>,
47}
48
49impl Resources {
50    pub fn new(
51        config: &super::MaterializeControllerArgs,
52        mz: &Materialize,
53        console_image_ref: &str,
54    ) -> Self {
55        let network_policies = create_network_policies(config, mz);
56        let console_configmap =
57            Box::new(create_console_app_configmap_object(mz, console_image_ref));
58        let console_deployment = Box::new(create_console_deployment_object(
59            config,
60            mz,
61            console_image_ref,
62        ));
63        let console_service = Box::new(create_console_service_object(config, mz));
64        let console_external_certificate =
65            Box::new(create_console_external_certificate(config, mz));
66        Self {
67            network_policies,
68            console_configmap,
69            console_deployment,
70            console_service,
71            console_external_certificate,
72        }
73    }
74
75    pub async fn apply(
76        &self,
77        client: &Client,
78        namespace: &str,
79    ) -> Result<Option<Action>, anyhow::Error> {
80        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
81        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
82        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
83        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
84        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
85
86        trace!("creating new network policies");
87        for network_policy in &self.network_policies {
88            apply_resource(&network_policy_api, network_policy).await?;
89        }
90
91        trace!("creating new console configmap");
92        apply_resource(&configmap_api, &self.console_configmap).await?;
93
94        trace!("creating new console deployment");
95        apply_resource(&deployment_api, &self.console_deployment).await?;
96
97        trace!("creating new console service");
98        apply_resource(&service_api, &self.console_service).await?;
99
100        if let Some(certificate) = &*self.console_external_certificate {
101            trace!("creating new console external certificate");
102            apply_resource(&certificate_api, certificate).await?;
103        }
104
105        Ok(None)
106    }
107
108    pub async fn cleanup(
109        &self,
110        client: &Client,
111        namespace: &str,
112    ) -> Result<Option<Action>, anyhow::Error> {
113        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
114        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
115        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
116        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
117        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
118
119        if let Some(certificate) = &*self.console_external_certificate {
120            trace!("deleting console external certificate");
121            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
122        }
123
124        trace!("deleting console service");
125        delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
126
127        trace!("deleting console deployment");
128        delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
129
130        trace!("deleting console configmap");
131        delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
132
133        trace!("deleting network policies");
134        for network_policy in &self.network_policies {
135            delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
136        }
137
138        Ok(None)
139    }
140}
141
142fn create_network_policies(
143    config: &super::MaterializeControllerArgs,
144    mz: &Materialize,
145) -> Vec<NetworkPolicy> {
146    let mut network_policies = Vec::new();
147    if config.network_policies.ingress_enabled {
148        let console_label_selector = LabelSelector {
149            match_labels: Some(
150                mz.default_labels()
151                    .into_iter()
152                    .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
153                    .collect(),
154            ),
155            ..Default::default()
156        };
157        network_policies.extend([NetworkPolicy {
158            metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
159            spec: Some(NetworkPolicySpec {
160                ingress: Some(vec![NetworkPolicyIngressRule {
161                    from: Some(
162                        config
163                            .network_policies
164                            .ingress_cidrs
165                            .iter()
166                            .map(|cidr| NetworkPolicyPeer {
167                                ip_block: Some(IPBlock {
168                                    cidr: cidr.to_owned(),
169                                    except: None,
170                                }),
171                                ..Default::default()
172                            })
173                            .collect(),
174                    ),
175                    ports: Some(vec![NetworkPolicyPort {
176                        port: Some(IntOrString::Int(config.console_http_port.into())),
177                        protocol: Some("TCP".to_string()),
178                        ..Default::default()
179                    }]),
180                    ..Default::default()
181                }]),
182                pod_selector: Some(console_label_selector),
183                policy_types: Some(vec!["Ingress".to_owned()]),
184                ..Default::default()
185            }),
186        }]);
187    }
188    network_policies
189}
190
191fn create_console_external_certificate(
192    config: &super::MaterializeControllerArgs,
193    mz: &Materialize,
194) -> Option<Certificate> {
195    create_certificate(
196        config.default_certificate_specs.console_external.clone(),
197        mz,
198        mz.spec.console_external_certificate_spec.clone(),
199        mz.console_external_certificate_name(),
200        mz.console_external_certificate_secret_name(),
201        None,
202        CertificatePrivateKeyAlgorithm::Rsa,
203        Some(4096),
204    )
205}
206
207#[derive(Serialize)]
208struct ConsoleAppConfig {
209    version: String,
210    auth: ConsoleAppConfigAuth,
211}
212
213#[derive(Serialize)]
214struct ConsoleAppConfigAuth {
215    mode: AuthenticatorKind,
216}
217
218fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
219    let version: String = console_image_ref
220        .rsplitn(2, ':')
221        .next()
222        .expect("at least one chunk, even if empty")
223        .to_owned();
224    let app_config_json = serde_json::to_string(&ConsoleAppConfig {
225        version,
226        auth: ConsoleAppConfigAuth {
227            mode: mz.spec.authenticator_kind,
228        },
229    })
230    .expect("known valid");
231    ConfigMap {
232        binary_data: None,
233        data: Some(btreemap! {
234            "app-config.json".to_owned() => app_config_json,
235        }),
236        immutable: None,
237        metadata: mz.managed_resource_meta(mz.console_configmap_name()),
238    }
239}
240
241fn create_console_deployment_object(
242    config: &super::MaterializeControllerArgs,
243    mz: &Materialize,
244    console_image_ref: &str,
245) -> Deployment {
246    let mut pod_template_labels = mz.default_labels();
247    pod_template_labels.insert(
248        "materialize.cloud/name".to_owned(),
249        mz.console_deployment_name(),
250    );
251    pod_template_labels.insert("app".to_owned(), "console".to_string());
252    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
253
254    let ports = vec![ContainerPort {
255        container_port: config.console_http_port.into(),
256        name: Some("http".into()),
257        protocol: Some("TCP".into()),
258        ..Default::default()
259    }];
260
261    let scheme = if issuer_ref_defined(
262        &config.default_certificate_specs.balancerd_external,
263        &mz.spec.balancerd_external_certificate_spec,
264    ) {
265        "https"
266    } else {
267        "http"
268    };
269    let mut env = vec![EnvVar {
270        name: "MZ_ENDPOINT".to_string(),
271        value: Some(format!(
272            "{}://{}.{}.svc.cluster.local:{}",
273            scheme,
274            mz.balancerd_service_name(),
275            mz.namespace(),
276            config.balancerd_http_port,
277        )),
278        ..Default::default()
279    }];
280    let mut volumes = vec![Volume {
281        name: "app-config".to_string(),
282        config_map: Some(ConfigMapVolumeSource {
283            name: mz.console_configmap_name(),
284            default_mode: Some(256),
285            optional: Some(false),
286            items: Some(vec![KeyToPath {
287                key: "app-config.json".to_string(),
288                path: "app-config.json".to_string(),
289                ..Default::default()
290            }]),
291        }),
292        ..Default::default()
293    }];
294    let mut volume_mounts = vec![VolumeMount {
295        name: "app-config".to_string(),
296        mount_path: "/usr/share/nginx/html/app-config".to_string(),
297        ..Default::default()
298    }];
299
300    let scheme = if issuer_ref_defined(
301        &config.default_certificate_specs.console_external,
302        &mz.spec.console_external_certificate_spec,
303    ) {
304        volumes.push(Volume {
305            name: "external-certificate".to_owned(),
306            secret: Some(SecretVolumeSource {
307                default_mode: Some(0o400),
308                secret_name: Some(mz.console_external_certificate_secret_name()),
309                items: None,
310                optional: Some(false),
311            }),
312            ..Default::default()
313        });
314        volume_mounts.push(VolumeMount {
315            name: "external-certificate".to_owned(),
316            mount_path: "/nginx/tls".to_owned(),
317            read_only: Some(true),
318            ..Default::default()
319        });
320        env.push(EnvVar {
321            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
322            value: Some(format!(
323                "listen {} ssl;
324ssl_certificate /nginx/tls/tls.crt;
325ssl_certificate_key /nginx/tls/tls.key;",
326                config.console_http_port
327            )),
328            ..Default::default()
329        });
330        Some("HTTPS".to_owned())
331    } else {
332        env.push(EnvVar {
333            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
334            value: Some(format!("listen {};", config.console_http_port)),
335            ..Default::default()
336        });
337        Some("HTTP".to_owned())
338    };
339
340    let probe = Probe {
341        http_get: Some(HTTPGetAction {
342            path: Some("/".to_string()),
343            port: IntOrString::Int(config.console_http_port.into()),
344            scheme,
345            ..Default::default()
346        }),
347        ..Default::default()
348    };
349
350    let security_context = if config.enable_security_context {
351        // Since we want to adhere to the most restrictive security context, all
352        // of these fields have to be set how they are.
353        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
354        Some(SecurityContext {
355            run_as_non_root: Some(true),
356            capabilities: Some(Capabilities {
357                drop: Some(vec!["ALL".to_string()]),
358                ..Default::default()
359            }),
360            seccomp_profile: Some(SeccompProfile {
361                type_: "RuntimeDefault".to_string(),
362                ..Default::default()
363            }),
364            allow_privilege_escalation: Some(false),
365            ..Default::default()
366        })
367    } else {
368        None
369    };
370
371    let container = Container {
372        name: "console".to_owned(),
373        image: Some(console_image_ref.to_string()),
374        image_pull_policy: Some(config.image_pull_policy.to_string()),
375        ports: Some(ports),
376        env: Some(env),
377        startup_probe: Some(Probe {
378            period_seconds: Some(1),
379            failure_threshold: Some(10),
380            ..probe.clone()
381        }),
382        readiness_probe: Some(Probe {
383            period_seconds: Some(30),
384            failure_threshold: Some(1),
385            ..probe.clone()
386        }),
387        liveness_probe: Some(Probe {
388            period_seconds: Some(30),
389            ..probe.clone()
390        }),
391        resources: mz
392            .spec
393            .console_resource_requirements
394            .clone()
395            .or_else(|| config.console_default_resources.clone()),
396        security_context,
397        volume_mounts: Some(volume_mounts),
398        ..Default::default()
399    };
400
401    let deployment_spec = DeploymentSpec {
402        replicas: Some(mz.console_replicas()),
403        selector: LabelSelector {
404            match_labels: Some(pod_template_labels.clone()),
405            ..Default::default()
406        },
407        template: PodTemplateSpec {
408            // not using managed_resource_meta because the pod should be
409            // owned by the deployment, not the materialize instance
410            metadata: Some(ObjectMeta {
411                labels: Some(pod_template_labels),
412                ..Default::default()
413            }),
414            spec: Some(PodSpec {
415                containers: vec![container],
416                node_selector: Some(
417                    config
418                        .console_node_selector
419                        .iter()
420                        .map(|selector| (selector.key.clone(), selector.value.clone()))
421                        .collect(),
422                ),
423                affinity: config.console_affinity.clone(),
424                tolerations: config.console_tolerations.clone(),
425                scheduler_name: config.scheduler_name.clone(),
426                service_account_name: Some(mz.service_account_name()),
427                volumes: Some(volumes),
428                security_context: Some(PodSecurityContext {
429                    fs_group: Some(101),
430                    ..Default::default()
431                }),
432                ..Default::default()
433            }),
434        },
435        ..Default::default()
436    };
437
438    Deployment {
439        metadata: ObjectMeta {
440            ..mz.managed_resource_meta(mz.console_deployment_name())
441        },
442        spec: Some(deployment_spec),
443        status: None,
444    }
445}
446
447fn create_console_service_object(
448    config: &super::MaterializeControllerArgs,
449    mz: &Materialize,
450) -> Service {
451    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
452
453    let ports = vec![ServicePort {
454        name: Some("http".to_string()),
455        protocol: Some("TCP".to_string()),
456        port: config.console_http_port.into(),
457        target_port: Some(IntOrString::Int(config.console_http_port.into())),
458        ..Default::default()
459    }];
460
461    let spec = ServiceSpec {
462        type_: Some("ClusterIP".to_string()),
463        cluster_ip: Some("None".to_string()),
464        selector: Some(selector),
465        ports: Some(ports),
466        ..Default::default()
467    };
468
469    Service {
470        metadata: mz.managed_resource_meta(mz.console_service_name()),
471        spec: Some(spec),
472        status: None,
473    }
474}