mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_openapi::{
11    api::{
12        apps::v1::{Deployment, DeploymentSpec},
13        core::v1::{
14            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
15            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
17            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
18        },
19        networking::v1::{
20            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21            NetworkPolicySpec,
22        },
23    },
24    apimachinery::pkg::{
25        apis::meta::v1::{Condition, LabelSelector, Time},
26        util::intstr::IntOrString,
27    },
28};
29use kube::{
30    Api, Client, Resource, ResourceExt,
31    api::{ObjectMeta, PostParams},
32    runtime::{
33        controller::Action,
34        reflector::{ObjectRef, Store},
35    },
36};
37use maplit::btreemap;
38use serde::Serialize;
39use tracing::trace;
40
41use crate::{
42    Error,
43    k8s::{apply_resource, make_reflector},
44    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
45};
46use mz_cloud_resources::crd::{
47    ManagedResource,
48    console::v1alpha1::{Console, HttpConnectionScheme},
49    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
50};
51use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
52use mz_ore::{cli::KeyValueArg, instrument};
53use mz_server_core::listeners::AuthenticatorKind;
54
55pub struct Config {
56    pub enable_security_context: bool,
57    pub enable_prometheus_scrape_annotations: bool,
58
59    pub image_pull_policy: KubernetesImagePullPolicy,
60    pub scheduler_name: Option<String>,
61    pub console_node_selector: Vec<KeyValueArg<String, String>>,
62    pub console_affinity: Option<Affinity>,
63    pub console_tolerations: Option<Vec<Toleration>>,
64    pub console_default_resources: Option<ResourceRequirements>,
65    pub network_policies_ingress_enabled: bool,
66    pub network_policies_ingress_cidrs: Vec<String>,
67
68    pub default_certificate_specs: DefaultCertificateSpecs,
69
70    pub console_http_port: u16,
71    pub balancerd_http_port: u16,
72}
73
74#[derive(Serialize)]
75struct AppConfig {
76    version: String,
77    auth: AppConfigAuth,
78}
79
80#[derive(Serialize)]
81struct AppConfigAuth {
82    mode: AuthenticatorKind,
83}
84
85pub struct Context {
86    config: Config,
87    deployments: Store<Deployment>,
88}
89
90impl Context {
91    pub async fn new(config: Config, client: Client) -> Self {
92        Self {
93            config,
94            deployments: make_reflector(client).await,
95        }
96    }
97
98    async fn sync_deployment_status(
99        &self,
100        client: &Client,
101        console: &Console,
102    ) -> Result<(), kube::Error> {
103        let namespace = console.namespace().unwrap();
104        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
105
106        let Some(deployment) = self
107            .deployments
108            .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
109        else {
110            return Ok(());
111        };
112
113        let Some(deployment_conditions) = &deployment
114            .status
115            .as_ref()
116            .and_then(|status| status.conditions.as_ref())
117        else {
118            // if the deployment doesn't have any conditions set yet, there
119            // is nothing to sync
120            return Ok(());
121        };
122
123        let ready = deployment_conditions
124            .iter()
125            .any(|condition| condition.type_ == "Available" && condition.status == "True");
126        let ready_str = if ready { "True" } else { "False" };
127
128        let mut status = console.status.clone().unwrap();
129        if status
130            .conditions
131            .iter()
132            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
133        {
134            // if the deployment status is already set correctly, we don't
135            // need to set it again (this prevents us from getting stuck in
136            // a reconcile loop)
137            return Ok(());
138        }
139
140        status.conditions = vec![Condition {
141            type_: "Ready".to_string(),
142            status: ready_str.to_string(),
143            last_transition_time: Time(chrono::offset::Utc::now()),
144            message: format!(
145                "console deployment is{} ready",
146                if ready { "" } else { " not" }
147            ),
148            observed_generation: None,
149            reason: "DeploymentStatus".to_string(),
150        }];
151        let mut new_console = console.clone();
152        new_console.status = Some(status);
153
154        console_api
155            .replace_status(
156                &console.name_unchecked(),
157                &PostParams::default(),
158                serde_json::to_vec(&new_console).unwrap(),
159            )
160            .await?;
161
162        Ok(())
163    }
164
165    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
166        let mut network_policies = Vec::new();
167        if self.config.network_policies_ingress_enabled {
168            let console_label_selector = LabelSelector {
169                match_labels: Some(
170                    console
171                        .default_labels()
172                        .into_iter()
173                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
174                        .collect(),
175                ),
176                ..Default::default()
177            };
178            network_policies.extend([NetworkPolicy {
179                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
180                spec: Some(NetworkPolicySpec {
181                    ingress: Some(vec![NetworkPolicyIngressRule {
182                        from: Some(
183                            self.config
184                                .network_policies_ingress_cidrs
185                                .iter()
186                                .map(|cidr| NetworkPolicyPeer {
187                                    ip_block: Some(IPBlock {
188                                        cidr: cidr.to_owned(),
189                                        except: None,
190                                    }),
191                                    ..Default::default()
192                                })
193                                .collect(),
194                        ),
195                        ports: Some(vec![NetworkPolicyPort {
196                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
197                            protocol: Some("TCP".to_string()),
198                            ..Default::default()
199                        }]),
200                        ..Default::default()
201                    }]),
202                    pod_selector: Some(console_label_selector),
203                    policy_types: Some(vec!["Ingress".to_owned()]),
204                    ..Default::default()
205                }),
206            }]);
207        }
208        network_policies
209    }
210
211    fn create_console_external_certificate(&self, console: &Console) -> Option<Certificate> {
212        create_certificate(
213            self.config
214                .default_certificate_specs
215                .console_external
216                .clone(),
217            console,
218            console.spec.external_certificate_spec.clone(),
219            console.external_certificate_name(),
220            console.external_certificate_secret_name(),
221            None,
222            CertificatePrivateKeyAlgorithm::Rsa,
223            Some(4096),
224        )
225    }
226
227    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
228        let version: String = console
229            .spec
230            .console_image_ref
231            .rsplitn(2, ':')
232            .next()
233            .expect("at least one chunk, even if empty")
234            .to_owned();
235        let app_config_json = serde_json::to_string(&AppConfig {
236            version,
237            auth: AppConfigAuth {
238                mode: console.spec.authenticator_kind,
239            },
240        })
241        .expect("known valid");
242        ConfigMap {
243            binary_data: None,
244            data: Some(btreemap! {
245                "app-config.json".to_owned() => app_config_json,
246            }),
247            immutable: None,
248            metadata: console.managed_resource_meta(console.configmap_name()),
249        }
250    }
251
252    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
253        let mut pod_template_labels = console.default_labels();
254        pod_template_labels.insert(
255            "materialize.cloud/name".to_owned(),
256            console.deployment_name(),
257        );
258        pod_template_labels.insert("app".to_owned(), "console".to_string());
259        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
260
261        let ports = vec![ContainerPort {
262            container_port: self.config.console_http_port.into(),
263            name: Some("http".into()),
264            protocol: Some("TCP".into()),
265            ..Default::default()
266        }];
267
268        let scheme = match console.spec.balancerd.scheme {
269            HttpConnectionScheme::Http => "http",
270            HttpConnectionScheme::Https => "https",
271        };
272        let mut env = vec![EnvVar {
273            name: "MZ_ENDPOINT".to_string(),
274            value: Some(format!(
275                "{}://{}.{}.svc.cluster.local:{}",
276                scheme,
277                console.spec.balancerd.service_name,
278                console.spec.balancerd.namespace,
279                self.config.balancerd_http_port,
280            )),
281            ..Default::default()
282        }];
283        let mut volumes = vec![Volume {
284            name: "app-config".to_string(),
285            config_map: Some(ConfigMapVolumeSource {
286                name: console.configmap_name(),
287                default_mode: Some(256),
288                optional: Some(false),
289                items: Some(vec![KeyToPath {
290                    key: "app-config.json".to_string(),
291                    path: "app-config.json".to_string(),
292                    ..Default::default()
293                }]),
294            }),
295            ..Default::default()
296        }];
297        let mut volume_mounts = vec![VolumeMount {
298            name: "app-config".to_string(),
299            mount_path: "/usr/share/nginx/html/app-config".to_string(),
300            ..Default::default()
301        }];
302
303        let scheme = if issuer_ref_defined(
304            &self.config.default_certificate_specs.console_external,
305            &console.spec.external_certificate_spec,
306        ) {
307            volumes.push(Volume {
308                name: "external-certificate".to_owned(),
309                secret: Some(SecretVolumeSource {
310                    default_mode: Some(0o400),
311                    secret_name: Some(console.external_certificate_secret_name()),
312                    items: None,
313                    optional: Some(false),
314                }),
315                ..Default::default()
316            });
317            volume_mounts.push(VolumeMount {
318                name: "external-certificate".to_owned(),
319                mount_path: "/nginx/tls".to_owned(),
320                read_only: Some(true),
321                ..Default::default()
322            });
323            env.push(EnvVar {
324                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
325                value: Some(format!(
326                    "listen {} ssl;
327ssl_certificate /nginx/tls/tls.crt;
328ssl_certificate_key /nginx/tls/tls.key;",
329                    self.config.console_http_port
330                )),
331                ..Default::default()
332            });
333            Some("HTTPS".to_owned())
334        } else {
335            env.push(EnvVar {
336                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
337                value: Some(format!("listen {};", self.config.console_http_port)),
338                ..Default::default()
339            });
340            Some("HTTP".to_owned())
341        };
342
343        let probe = Probe {
344            http_get: Some(HTTPGetAction {
345                path: Some("/".to_string()),
346                port: IntOrString::Int(self.config.console_http_port.into()),
347                scheme,
348                ..Default::default()
349            }),
350            ..Default::default()
351        };
352
353        let security_context = if self.config.enable_security_context {
354            // Since we want to adhere to the most restrictive security context, all
355            // of these fields have to be set how they are.
356            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
357            Some(SecurityContext {
358                run_as_non_root: Some(true),
359                capabilities: Some(Capabilities {
360                    drop: Some(vec!["ALL".to_string()]),
361                    ..Default::default()
362                }),
363                seccomp_profile: Some(SeccompProfile {
364                    type_: "RuntimeDefault".to_string(),
365                    ..Default::default()
366                }),
367                allow_privilege_escalation: Some(false),
368                ..Default::default()
369            })
370        } else {
371            None
372        };
373
374        let container = Container {
375            name: "console".to_owned(),
376            image: Some(console.spec.console_image_ref.clone()),
377            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
378            ports: Some(ports),
379            env: Some(env),
380            startup_probe: Some(Probe {
381                period_seconds: Some(1),
382                failure_threshold: Some(10),
383                ..probe.clone()
384            }),
385            readiness_probe: Some(Probe {
386                period_seconds: Some(30),
387                failure_threshold: Some(1),
388                ..probe.clone()
389            }),
390            liveness_probe: Some(Probe {
391                period_seconds: Some(30),
392                ..probe.clone()
393            }),
394            resources: console
395                .spec
396                .resource_requirements
397                .clone()
398                .or_else(|| self.config.console_default_resources.clone()),
399            security_context,
400            volume_mounts: Some(volume_mounts),
401            ..Default::default()
402        };
403
404        let deployment_spec = DeploymentSpec {
405            replicas: Some(console.replicas()),
406            selector: LabelSelector {
407                match_labels: Some(pod_template_labels.clone()),
408                ..Default::default()
409            },
410            template: PodTemplateSpec {
411                // not using managed_resource_meta because the pod should be
412                // owned by the deployment, not the materialize instance
413                metadata: Some(ObjectMeta {
414                    labels: Some(pod_template_labels),
415                    ..Default::default()
416                }),
417                spec: Some(PodSpec {
418                    containers: vec![container],
419                    node_selector: Some(
420                        self.config
421                            .console_node_selector
422                            .iter()
423                            .map(|selector| (selector.key.clone(), selector.value.clone()))
424                            .collect(),
425                    ),
426                    affinity: self.config.console_affinity.clone(),
427                    tolerations: self.config.console_tolerations.clone(),
428                    scheduler_name: self.config.scheduler_name.clone(),
429                    volumes: Some(volumes),
430                    security_context: Some(PodSecurityContext {
431                        fs_group: Some(101),
432                        ..Default::default()
433                    }),
434                    ..Default::default()
435                }),
436            },
437            ..Default::default()
438        };
439
440        Deployment {
441            metadata: ObjectMeta {
442                ..console.managed_resource_meta(console.deployment_name())
443            },
444            spec: Some(deployment_spec),
445            status: None,
446        }
447    }
448
449    fn create_console_service_object(&self, console: &Console) -> Service {
450        let selector =
451            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
452
453        let ports = vec![ServicePort {
454            name: Some("http".to_string()),
455            protocol: Some("TCP".to_string()),
456            port: self.config.console_http_port.into(),
457            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
458            ..Default::default()
459        }];
460
461        let spec = ServiceSpec {
462            type_: Some("ClusterIP".to_string()),
463            cluster_ip: Some("None".to_string()),
464            selector: Some(selector),
465            ports: Some(ports),
466            ..Default::default()
467        };
468
469        Service {
470            metadata: console.managed_resource_meta(console.service_name()),
471            spec: Some(spec),
472            status: None,
473        }
474    }
475}
476
477#[async_trait::async_trait]
478impl k8s_controller::Context for Context {
479    type Resource = Console;
480    type Error = Error;
481
482    #[instrument(fields())]
483    async fn apply(
484        &self,
485        client: Client,
486        console: &Self::Resource,
487    ) -> Result<Option<Action>, Self::Error> {
488        if console.status.is_none() {
489            let console_api: Api<Console> =
490                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
491            let mut new_console = console.clone();
492            new_console.status = Some(console.status());
493            console_api
494                .replace_status(
495                    &console.name_unchecked(),
496                    &PostParams::default(),
497                    serde_json::to_vec(&new_console).unwrap(),
498                )
499                .await?;
500            // Updating the status should trigger a reconciliation
501            // which will include a status this time.
502            return Ok(None);
503        }
504
505        let namespace = console.namespace().unwrap();
506        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
507        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
508        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
509        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
510        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
511
512        trace!("creating new network policies");
513        let network_policies = self.create_network_policies(console);
514        for network_policy in &network_policies {
515            apply_resource(&network_policy_api, network_policy).await?;
516        }
517
518        trace!("creating new console configmap");
519        let console_configmap = self.create_console_app_configmap_object(console);
520        apply_resource(&configmap_api, &console_configmap).await?;
521
522        trace!("creating new console deployment");
523        let console_deployment = self.create_console_deployment_object(console);
524        apply_resource(&deployment_api, &console_deployment).await?;
525
526        trace!("creating new console service");
527        let console_service = self.create_console_service_object(console);
528        apply_resource(&service_api, &console_service).await?;
529
530        let console_external_certificate = self.create_console_external_certificate(console);
531        if let Some(certificate) = &console_external_certificate {
532            trace!("creating new console external certificate");
533            apply_resource(&certificate_api, certificate).await?;
534        }
535
536        self.sync_deployment_status(&client, console).await?;
537
538        Ok(None)
539    }
540}