mz_orchestratord/controller/materialize/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15            HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17            Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33    k8s::{apply_resource, delete_resource},
34    tls::{create_certificate, issuer_ref_defined},
35};
36use mz_cloud_resources::crd::{
37    ManagedResource,
38    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
39    materialize::v1alpha1::Materialize,
40};
41
42pub struct Resources {
43    network_policies: Vec<NetworkPolicy>,
44    console_configmap: Box<ConfigMap>,
45    console_deployment: Box<Deployment>,
46    console_service: Box<Service>,
47    console_external_certificate: Box<Option<Certificate>>,
48}
49
50impl Resources {
51    pub fn new(config: &super::Config, mz: &Materialize, console_image_ref: &str) -> Self {
52        let network_policies = create_network_policies(config, mz);
53        let console_configmap =
54            Box::new(create_console_app_configmap_object(mz, console_image_ref));
55        let console_deployment = Box::new(create_console_deployment_object(
56            config,
57            mz,
58            console_image_ref,
59        ));
60        let console_service = Box::new(create_console_service_object(config, mz));
61        let console_external_certificate =
62            Box::new(create_console_external_certificate(config, mz));
63        Self {
64            network_policies,
65            console_configmap,
66            console_deployment,
67            console_service,
68            console_external_certificate,
69        }
70    }
71
72    pub async fn apply(
73        &self,
74        client: &Client,
75        namespace: &str,
76    ) -> Result<Option<Action>, anyhow::Error> {
77        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
78        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
79        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
80        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
81        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
82
83        trace!("creating new network policies");
84        for network_policy in &self.network_policies {
85            apply_resource(&network_policy_api, network_policy).await?;
86        }
87
88        trace!("creating new console configmap");
89        apply_resource(&configmap_api, &self.console_configmap).await?;
90
91        trace!("creating new console deployment");
92        apply_resource(&deployment_api, &self.console_deployment).await?;
93
94        trace!("creating new console service");
95        apply_resource(&service_api, &self.console_service).await?;
96
97        if let Some(certificate) = &*self.console_external_certificate {
98            trace!("creating new console external certificate");
99            apply_resource(&certificate_api, certificate).await?;
100        }
101
102        Ok(None)
103    }
104
105    pub async fn cleanup(
106        &self,
107        client: &Client,
108        namespace: &str,
109    ) -> Result<Option<Action>, anyhow::Error> {
110        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
111        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
112        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
113        let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
114        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
115
116        if let Some(certificate) = &*self.console_external_certificate {
117            trace!("deleting console external certificate");
118            delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
119        }
120
121        trace!("deleting console service");
122        delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
123
124        trace!("deleting console deployment");
125        delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
126
127        trace!("deleting console configmap");
128        delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
129
130        trace!("deleting network policies");
131        for network_policy in &self.network_policies {
132            delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
133        }
134
135        Ok(None)
136    }
137}
138
139fn create_network_policies(config: &super::Config, mz: &Materialize) -> Vec<NetworkPolicy> {
140    let mut network_policies = Vec::new();
141    if config.network_policies_ingress_enabled {
142        let console_label_selector = LabelSelector {
143            match_labels: Some(
144                mz.default_labels()
145                    .into_iter()
146                    .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
147                    .collect(),
148            ),
149            ..Default::default()
150        };
151        network_policies.extend([NetworkPolicy {
152            metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
153            spec: Some(NetworkPolicySpec {
154                ingress: Some(vec![NetworkPolicyIngressRule {
155                    from: Some(
156                        config
157                            .network_policies_ingress_cidrs
158                            .iter()
159                            .map(|cidr| NetworkPolicyPeer {
160                                ip_block: Some(IPBlock {
161                                    cidr: cidr.to_owned(),
162                                    except: None,
163                                }),
164                                ..Default::default()
165                            })
166                            .collect(),
167                    ),
168                    ports: Some(vec![NetworkPolicyPort {
169                        port: Some(IntOrString::Int(config.console_http_port.into())),
170                        protocol: Some("TCP".to_string()),
171                        ..Default::default()
172                    }]),
173                    ..Default::default()
174                }]),
175                pod_selector: Some(console_label_selector),
176                policy_types: Some(vec!["Ingress".to_owned()]),
177                ..Default::default()
178            }),
179        }]);
180    }
181    network_policies
182}
183
184fn create_console_external_certificate(
185    config: &super::Config,
186    mz: &Materialize,
187) -> Option<Certificate> {
188    create_certificate(
189        config.default_certificate_specs.console_external.clone(),
190        mz,
191        mz.spec.console_external_certificate_spec.clone(),
192        mz.console_external_certificate_name(),
193        mz.console_external_certificate_secret_name(),
194        None,
195        CertificatePrivateKeyAlgorithm::Rsa,
196        Some(4096),
197    )
198}
199
200#[derive(Serialize)]
201struct ConsoleAppConfig {
202    version: String,
203    auth: ConsoleAppConfigAuth,
204}
205
206#[derive(Serialize)]
207struct ConsoleAppConfigAuth {
208    mode: AuthenticatorKind,
209}
210
211fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
212    let version: String = console_image_ref
213        .rsplitn(2, ':')
214        .next()
215        .expect("at least one chunk, even if empty")
216        .to_owned();
217    let app_config_json = serde_json::to_string(&ConsoleAppConfig {
218        version,
219        auth: ConsoleAppConfigAuth {
220            mode: mz.spec.authenticator_kind,
221        },
222    })
223    .expect("known valid");
224    ConfigMap {
225        binary_data: None,
226        data: Some(btreemap! {
227            "app-config.json".to_owned() => app_config_json,
228        }),
229        immutable: None,
230        metadata: mz.managed_resource_meta(mz.console_configmap_name()),
231    }
232}
233
234fn create_console_deployment_object(
235    config: &super::Config,
236    mz: &Materialize,
237    console_image_ref: &str,
238) -> Deployment {
239    let mut pod_template_labels = mz.default_labels();
240    pod_template_labels.insert(
241        "materialize.cloud/name".to_owned(),
242        mz.console_deployment_name(),
243    );
244    pod_template_labels.insert("app".to_owned(), "console".to_string());
245    pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
246
247    let ports = vec![ContainerPort {
248        container_port: config.console_http_port.into(),
249        name: Some("http".into()),
250        protocol: Some("TCP".into()),
251        ..Default::default()
252    }];
253
254    let scheme = if issuer_ref_defined(
255        &config.default_certificate_specs.balancerd_external,
256        &mz.spec.balancerd_external_certificate_spec,
257    ) {
258        "https"
259    } else {
260        "http"
261    };
262    let mut env = vec![EnvVar {
263        name: "MZ_ENDPOINT".to_string(),
264        value: Some(format!(
265            "{}://{}.{}.svc.cluster.local:{}",
266            scheme,
267            mz.balancerd_service_name(),
268            mz.namespace(),
269            config.balancerd_http_port,
270        )),
271        ..Default::default()
272    }];
273    let mut volumes = vec![Volume {
274        name: "app-config".to_string(),
275        config_map: Some(ConfigMapVolumeSource {
276            name: mz.console_configmap_name(),
277            default_mode: Some(256),
278            optional: Some(false),
279            items: Some(vec![KeyToPath {
280                key: "app-config.json".to_string(),
281                path: "app-config.json".to_string(),
282                ..Default::default()
283            }]),
284        }),
285        ..Default::default()
286    }];
287    let mut volume_mounts = vec![VolumeMount {
288        name: "app-config".to_string(),
289        mount_path: "/usr/share/nginx/html/app-config".to_string(),
290        ..Default::default()
291    }];
292
293    let scheme = if issuer_ref_defined(
294        &config.default_certificate_specs.console_external,
295        &mz.spec.console_external_certificate_spec,
296    ) {
297        volumes.push(Volume {
298            name: "external-certificate".to_owned(),
299            secret: Some(SecretVolumeSource {
300                default_mode: Some(0o400),
301                secret_name: Some(mz.console_external_certificate_secret_name()),
302                items: None,
303                optional: Some(false),
304            }),
305            ..Default::default()
306        });
307        volume_mounts.push(VolumeMount {
308            name: "external-certificate".to_owned(),
309            mount_path: "/nginx/tls".to_owned(),
310            read_only: Some(true),
311            ..Default::default()
312        });
313        env.push(EnvVar {
314            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
315            value: Some(format!(
316                "listen {} ssl;
317ssl_certificate /nginx/tls/tls.crt;
318ssl_certificate_key /nginx/tls/tls.key;",
319                config.console_http_port
320            )),
321            ..Default::default()
322        });
323        Some("HTTPS".to_owned())
324    } else {
325        env.push(EnvVar {
326            name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
327            value: Some(format!("listen {};", config.console_http_port)),
328            ..Default::default()
329        });
330        Some("HTTP".to_owned())
331    };
332
333    let probe = Probe {
334        http_get: Some(HTTPGetAction {
335            path: Some("/".to_string()),
336            port: IntOrString::Int(config.console_http_port.into()),
337            scheme,
338            ..Default::default()
339        }),
340        ..Default::default()
341    };
342
343    let security_context = if config.enable_security_context {
344        // Since we want to adhere to the most restrictive security context, all
345        // of these fields have to be set how they are.
346        // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
347        Some(SecurityContext {
348            run_as_non_root: Some(true),
349            capabilities: Some(Capabilities {
350                drop: Some(vec!["ALL".to_string()]),
351                ..Default::default()
352            }),
353            seccomp_profile: Some(SeccompProfile {
354                type_: "RuntimeDefault".to_string(),
355                ..Default::default()
356            }),
357            allow_privilege_escalation: Some(false),
358            ..Default::default()
359        })
360    } else {
361        None
362    };
363
364    let container = Container {
365        name: "console".to_owned(),
366        image: Some(console_image_ref.to_string()),
367        image_pull_policy: Some(config.image_pull_policy.to_string()),
368        ports: Some(ports),
369        env: Some(env),
370        startup_probe: Some(Probe {
371            period_seconds: Some(1),
372            failure_threshold: Some(10),
373            ..probe.clone()
374        }),
375        readiness_probe: Some(Probe {
376            period_seconds: Some(30),
377            failure_threshold: Some(1),
378            ..probe.clone()
379        }),
380        liveness_probe: Some(Probe {
381            period_seconds: Some(30),
382            ..probe.clone()
383        }),
384        resources: mz
385            .spec
386            .console_resource_requirements
387            .clone()
388            .or_else(|| config.console_default_resources.clone()),
389        security_context,
390        volume_mounts: Some(volume_mounts),
391        ..Default::default()
392    };
393
394    let deployment_spec = DeploymentSpec {
395        replicas: Some(mz.console_replicas()),
396        selector: LabelSelector {
397            match_labels: Some(pod_template_labels.clone()),
398            ..Default::default()
399        },
400        template: PodTemplateSpec {
401            // not using managed_resource_meta because the pod should be
402            // owned by the deployment, not the materialize instance
403            metadata: Some(ObjectMeta {
404                labels: Some(pod_template_labels),
405                ..Default::default()
406            }),
407            spec: Some(PodSpec {
408                containers: vec![container],
409                node_selector: Some(
410                    config
411                        .console_node_selector
412                        .iter()
413                        .map(|selector| (selector.key.clone(), selector.value.clone()))
414                        .collect(),
415                ),
416                affinity: config.console_affinity.clone(),
417                tolerations: config.console_tolerations.clone(),
418                scheduler_name: config.scheduler_name.clone(),
419                service_account_name: Some(mz.service_account_name()),
420                volumes: Some(volumes),
421                security_context: Some(PodSecurityContext {
422                    fs_group: Some(101),
423                    ..Default::default()
424                }),
425                ..Default::default()
426            }),
427        },
428        ..Default::default()
429    };
430
431    Deployment {
432        metadata: ObjectMeta {
433            ..mz.managed_resource_meta(mz.console_deployment_name())
434        },
435        spec: Some(deployment_spec),
436        status: None,
437    }
438}
439
440fn create_console_service_object(config: &super::Config, mz: &Materialize) -> Service {
441    let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
442
443    let ports = vec![ServicePort {
444        name: Some("http".to_string()),
445        protocol: Some("TCP".to_string()),
446        port: config.console_http_port.into(),
447        target_port: Some(IntOrString::Int(config.console_http_port.into())),
448        ..Default::default()
449    }];
450
451    let spec = ServiceSpec {
452        type_: Some("ClusterIP".to_string()),
453        cluster_ip: Some("None".to_string()),
454        selector: Some(selector),
455        ports: Some(ports),
456        ..Default::default()
457    };
458
459    Service {
460        metadata: mz.managed_resource_meta(mz.console_service_name()),
461        spec: Some(spec),
462        status: None,
463    }
464}