mz_orchestratord/controller/materialize/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15            HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17            Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33    controller::materialize::tls::{create_certificate, issuer_ref_defined},
34    k8s::{apply_resource, delete_resource},
35};
36use mz_cloud_resources::crd::{
37    generated::cert_manager::certificates::Certificate, materialize::v1alpha1::Materialize,
38};
39
40pub struct Resources {
41    network_policies: Vec<NetworkPolicy>,
42    console_configmap: Box<ConfigMap>,
43    console_deployment: Box<Deployment>,
44    console_service: Box<Service>,
45    console_external_certificate: Box<Option<Certificate>>,
46}
47
48impl Resources {
49    pub fn new(
50        config: &super::MaterializeControllerArgs,
51        mz: &Materialize,
52        console_image_ref: &str,
53    ) -> Self {
54        let network_policies = create_network_policies(config, mz);
55        let console_configmap =
56            Box::new(create_console_app_configmap_object(mz, console_image_ref));
57        let console_deployment = Box::new(create_console_deployment_object(
58            config,
59            mz,
60            console_image_ref,
61        ));
62        let console_service = Box::new(create_console_service_object(config, mz));
63        let console_external_certificate =
64            Box::new(create_console_external_certificate(config, mz));
65        Self {
66            network_policies,
67            console_configmap,
68            console_deployment,
69            console_service,
70            console_external_certificate,
71        }
72    }
73
74    pub async fn apply(
75        &self,
76        client: &Client,
77        namespace: &str,
78    ) -> Result<Option<Action>, anyhow::Error> {
79        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
80        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
81        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
82        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
83        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
84
85        trace!("creating new network policies");
86        for network_policy in &self.network_policies {
87            apply_resource(&network_policy_api, network_policy).await?;
88        }
89
90        trace!("creating new console configmap");
91        apply_resource(&configmap_api, &self.console_configmap).await?;
92
93        trace!("creating new console deployment");
94        apply_resource(&deployment_api, &self.console_deployment).await?;
95
96        trace!("creating new console service");
97        apply_resource(&service_api, &self.console_service).await?;
98
99        if let Some(certificate) = &*self.console_external_certificate {
100            trace!("creating new console external certificate");
101            apply_resource(&certificate_api, certificate).await?;
102        }
103
104        Ok(None)
105    }
106
107    pub async fn cleanup(
108        &self,
109        client: &Client,
110        namespace: &str,
111    ) -> Result<Option<Action>, anyhow::Error> {
112        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
113        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
114        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
115        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
116        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
117
118        if let Some(certificate) = &*self.console_external_certificate {
119            trace!("deleting console external certificate");
120            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
121        }
122
123        trace!("deleting console service");
124        delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
125
126        trace!("deleting console deployment");
127        delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
128
129        trace!("deleting console configmap");
130        delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
131
132        trace!("deleting network policies");
133        for network_policy in &self.network_policies {
134            delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
135        }
136
137        Ok(None)
138    }
139}
140
141fn create_network_policies(
142    config: &super::MaterializeControllerArgs,
143    mz: &Materialize,
144) -> Vec<NetworkPolicy> {
145    let mut network_policies = Vec::new();
146    if config.network_policies.ingress_enabled {
147        let console_label_selector = LabelSelector {
148            match_labels: Some(
149                mz.default_labels()
150                    .into_iter()
151                    .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
152                    .collect(),
153            ),
154            ..Default::default()
155        };
156        network_policies.extend([NetworkPolicy {
157            metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
158            spec: Some(NetworkPolicySpec {
159                ingress: Some(vec![NetworkPolicyIngressRule {
160                    from: Some(
161                        config
162                            .network_policies
163                            .ingress_cidrs
164                            .iter()
165                            .map(|cidr| NetworkPolicyPeer {
166                                ip_block: Some(IPBlock {
167                                    cidr: cidr.to_owned(),
168                                    except: None,
169                                }),
170                                ..Default::default()
171                            })
172                            .collect(),
173                    ),
174                    ports: Some(vec![NetworkPolicyPort {
175                        port: Some(IntOrString::Int(config.console_http_port.into())),
176                        protocol: Some("TCP".to_string()),
177                        ..Default::default()
178                    }]),
179                    ..Default::default()
180                }]),
181                pod_selector: console_label_selector,
182                policy_types: Some(vec!["Ingress".to_owned()]),
183                ..Default::default()
184            }),
185        }]);
186    }
187    network_policies
188}
189
190fn create_console_external_certificate(
191    config: &super::MaterializeControllerArgs,
192    mz: &Materialize,
193) -> Option<Certificate> {
194    create_certificate(
195        config.default_certificate_specs.console_external.clone(),
196        mz,
197        mz.spec.console_external_certificate_spec.clone(),
198        mz.console_external_certificate_name(),
199        mz.console_external_certificate_secret_name(),
200        None,
201    )
202}
203
204#[derive(Serialize)]
205struct ConsoleAppConfig {
206    version: String,
207    auth: ConsoleAppConfigAuth,
208}
209
210#[derive(Serialize)]
211struct ConsoleAppConfigAuth {
212    mode: AuthenticatorKind,
213}
214
215fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
216    let version: String = console_image_ref
217        .rsplitn(2, ':')
218        .next()
219        .expect("at least one chunk, even if empty")
220        .to_owned();
221    let app_config_json = serde_json::to_string(&ConsoleAppConfig {
222        version,
223        auth: ConsoleAppConfigAuth {
224            mode: mz.spec.authenticator_kind,
225        },
226    })
227    .expect("known valid");
228    ConfigMap {
229        binary_data: None,
230        data: Some(btreemap! {
231            "app-config.json".to_owned() => app_config_json,
232        }),
233        immutable: None,
234        metadata: mz.managed_resource_meta(mz.console_configmap_name()),
235    }
236}
237
238fn create_console_deployment_object(
239    config: &super::MaterializeControllerArgs,
240    mz: &Materialize,
241    console_image_ref: &str,
242) -> Deployment {
243    let mut pod_template_labels = mz.default_labels();
244    pod_template_labels.insert(
245        "materialize.cloud/name".to_owned(),
246        mz.console_deployment_name(),
247    );
248    pod_template_labels.insert("app".to_owned(), "console".to_string());
249    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
250
251    let ports = vec![ContainerPort {
252        container_port: config.console_http_port.into(),
253        name: Some("http".into()),
254        protocol: Some("TCP".into()),
255        ..Default::default()
256    }];
257
258    let scheme = if issuer_ref_defined(
259        &config.default_certificate_specs.balancerd_external,
260        &mz.spec.balancerd_external_certificate_spec,
261    ) {
262        "https"
263    } else {
264        "http"
265    };
266    let mut env = vec![EnvVar {
267        name: "MZ_ENDPOINT".to_string(),
268        value: Some(format!(
269            "{}://{}.{}.svc.cluster.local:{}",
270            scheme,
271            mz.balancerd_service_name(),
272            mz.namespace(),
273            config.balancerd_http_port,
274        )),
275        ..Default::default()
276    }];
277    let mut volumes = vec![Volume {
278        name: "app-config".to_string(),
279        config_map: Some(ConfigMapVolumeSource {
280            name: mz.console_configmap_name(),
281            default_mode: Some(256),
282            optional: Some(false),
283            items: Some(vec![KeyToPath {
284                key: "app-config.json".to_string(),
285                path: "app-config.json".to_string(),
286                ..Default::default()
287            }]),
288        }),
289        ..Default::default()
290    }];
291    let mut volume_mounts = vec![VolumeMount {
292        name: "app-config".to_string(),
293        mount_path: "/usr/share/nginx/html/app-config".to_string(),
294        ..Default::default()
295    }];
296
297    let scheme = if issuer_ref_defined(
298        &config.default_certificate_specs.console_external,
299        &mz.spec.console_external_certificate_spec,
300    ) {
301        volumes.push(Volume {
302            name: "external-certificate".to_owned(),
303            secret: Some(SecretVolumeSource {
304                default_mode: Some(0o400),
305                secret_name: Some(mz.console_external_certificate_secret_name()),
306                items: None,
307                optional: Some(false),
308            }),
309            ..Default::default()
310        });
311        volume_mounts.push(VolumeMount {
312            name: "external-certificate".to_owned(),
313            mount_path: "/nginx/tls".to_owned(),
314            read_only: Some(true),
315            ..Default::default()
316        });
317        env.push(EnvVar {
318            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
319            value: Some(format!(
320                "listen {} ssl;
321ssl_certificate /nginx/tls/tls.crt;
322ssl_certificate_key /nginx/tls/tls.key;",
323                config.console_http_port
324            )),
325            ..Default::default()
326        });
327        Some("HTTPS".to_owned())
328    } else {
329        env.push(EnvVar {
330            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
331            value: Some(format!("listen {};", config.console_http_port)),
332            ..Default::default()
333        });
334        Some("HTTP".to_owned())
335    };
336
337    let probe = Probe {
338        http_get: Some(HTTPGetAction {
339            path: Some("/".to_string()),
340            port: IntOrString::Int(config.console_http_port.into()),
341            scheme,
342            ..Default::default()
343        }),
344        ..Default::default()
345    };
346
347    let security_context = if config.enable_security_context {
348        // Since we want to adhere to the most restrictive security context, all
349        // of these fields have to be set how they are.
350        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
351        Some(SecurityContext {
352            run_as_non_root: Some(true),
353            capabilities: Some(Capabilities {
354                drop: Some(vec!["ALL".to_string()]),
355                ..Default::default()
356            }),
357            seccomp_profile: Some(SeccompProfile {
358                type_: "RuntimeDefault".to_string(),
359                ..Default::default()
360            }),
361            allow_privilege_escalation: Some(false),
362            ..Default::default()
363        })
364    } else {
365        None
366    };
367
368    let container = Container {
369        name: "console".to_owned(),
370        image: Some(console_image_ref.to_string()),
371        image_pull_policy: Some(config.image_pull_policy.to_string()),
372        ports: Some(ports),
373        env: Some(env),
374        startup_probe: Some(Probe {
375            period_seconds: Some(1),
376            failure_threshold: Some(10),
377            ..probe.clone()
378        }),
379        readiness_probe: Some(Probe {
380            period_seconds: Some(30),
381            failure_threshold: Some(1),
382            ..probe.clone()
383        }),
384        liveness_probe: Some(Probe {
385            period_seconds: Some(30),
386            ..probe.clone()
387        }),
388        resources: mz.spec.console_resource_requirements.clone(),
389        security_context,
390        volume_mounts: Some(volume_mounts),
391        ..Default::default()
392    };
393
394    let deployment_spec = DeploymentSpec {
395        replicas: Some(2),
396        selector: LabelSelector {
397            match_labels: Some(pod_template_labels.clone()),
398            ..Default::default()
399        },
400        template: PodTemplateSpec {
401            // not using managed_resource_meta because the pod should be
402            // owned by the deployment, not the materialize instance
403            metadata: Some(ObjectMeta {
404                labels: Some(pod_template_labels),
405                ..Default::default()
406            }),
407            spec: Some(PodSpec {
408                containers: vec![container],
409                node_selector: Some(
410                    config
411                        .console_node_selector
412                        .iter()
413                        .map(|selector| (selector.key.clone(), selector.value.clone()))
414                        .collect(),
415                ),
416                affinity: config.console_affinity.clone(),
417                tolerations: config.console_tolerations.clone(),
418                scheduler_name: config.scheduler_name.clone(),
419                service_account_name: Some(mz.service_account_name()),
420                volumes: Some(volumes),
421                security_context: Some(PodSecurityContext {
422                    fs_group: Some(101),
423                    ..Default::default()
424                }),
425                ..Default::default()
426            }),
427        },
428        ..Default::default()
429    };
430
431    Deployment {
432        metadata: ObjectMeta {
433            ..mz.managed_resource_meta(mz.console_deployment_name())
434        },
435        spec: Some(deployment_spec),
436        status: None,
437    }
438}
439
440fn create_console_service_object(
441    config: &super::MaterializeControllerArgs,
442    mz: &Materialize,
443) -> Service {
444    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
445
446    let ports = vec![ServicePort {
447        name: Some("http".to_string()),
448        protocol: Some("TCP".to_string()),
449        port: config.console_http_port.into(),
450        target_port: Some(IntOrString::Int(config.console_http_port.into())),
451        ..Default::default()
452    }];
453
454    let spec = ServiceSpec {
455        type_: Some("ClusterIP".to_string()),
456        cluster_ip: Some("None".to_string()),
457        selector: Some(selector),
458        ports: Some(ports),
459        ..Default::default()
460    };
461
462    Service {
463        metadata: mz.managed_resource_meta(mz.console_service_name()),
464        spec: Some(spec),
465        status: None,
466    }
467}