mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use k8s_openapi::{
13    api::{
14        apps::v1::{Deployment, DeploymentSpec},
15        core::v1::{
16            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
17            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
18            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
19            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
20        },
21        networking::v1::{
22            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
23            NetworkPolicySpec,
24        },
25    },
26    apimachinery::pkg::{
27        apis::meta::v1::{Condition, LabelSelector, Time},
28        util::intstr::IntOrString,
29    },
30};
31use kube::{
32    Api, Client, Resource, ResourceExt,
33    api::{DeleteParams, ObjectMeta, PostParams},
34    runtime::{
35        conditions::is_deployment_completed,
36        controller::Action,
37        reflector::{ObjectRef, Store},
38        wait::await_condition,
39    },
40};
41use maplit::btreemap;
42use serde::Serialize;
43use tracing::{trace, warn};
44
45use crate::{
46    Error,
47    k8s::{apply_resource, make_reflector, replace_resource},
48    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
49};
50use mz_cloud_resources::crd::{
51    ManagedResource,
52    console::v1alpha1::{Console, HttpConnectionScheme},
53    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
54};
55use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
56use mz_ore::{cli::KeyValueArg, instrument};
57use mz_server_core::listeners::AuthenticatorKind;
58
59pub struct Config {
60    pub enable_security_context: bool,
61    pub enable_prometheus_scrape_annotations: bool,
62
63    pub image_pull_policy: KubernetesImagePullPolicy,
64    pub scheduler_name: Option<String>,
65    pub console_node_selector: Vec<KeyValueArg<String, String>>,
66    pub console_affinity: Option<Affinity>,
67    pub console_tolerations: Option<Vec<Toleration>>,
68    pub console_default_resources: Option<ResourceRequirements>,
69    pub network_policies_ingress_enabled: bool,
70    pub network_policies_ingress_cidrs: Vec<String>,
71
72    pub default_certificate_specs: DefaultCertificateSpecs,
73
74    pub console_http_port: u16,
75    pub balancerd_http_port: u16,
76}
77
78#[derive(Serialize)]
79struct AppConfig {
80    version: String,
81    auth: AppConfigAuth,
82}
83
84#[derive(Serialize)]
85struct AppConfigAuth {
86    mode: AuthenticatorKind,
87}
88
89pub struct Context {
90    config: Config,
91    deployments: Store<Deployment>,
92}
93
94impl Context {
95    pub async fn new(config: Config, client: Client) -> Self {
96        Self {
97            config,
98            deployments: make_reflector(client).await,
99        }
100    }
101
102    async fn sync_deployment_status(
103        &self,
104        client: &Client,
105        console: &Console,
106    ) -> Result<(), kube::Error> {
107        let namespace = console.namespace();
108        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
109
110        let Some(deployment) = self
111            .deployments
112            .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
113        else {
114            return Ok(());
115        };
116
117        let Some(deployment_conditions) = &deployment
118            .status
119            .as_ref()
120            .and_then(|status| status.conditions.as_ref())
121        else {
122            // if the deployment doesn't have any conditions set yet, there
123            // is nothing to sync
124            return Ok(());
125        };
126
127        let ready = deployment_conditions
128            .iter()
129            .any(|condition| condition.type_ == "Available" && condition.status == "True");
130        let ready_str = if ready { "True" } else { "False" };
131
132        let mut status = console.status.clone().unwrap();
133        if status
134            .conditions
135            .iter()
136            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
137        {
138            // if the deployment status is already set correctly, we don't
139            // need to set it again (this prevents us from getting stuck in
140            // a reconcile loop)
141            return Ok(());
142        }
143
144        status.conditions = vec![Condition {
145            type_: "Ready".to_string(),
146            status: ready_str.to_string(),
147            last_transition_time: Time(chrono::offset::Utc::now()),
148            message: format!(
149                "console deployment is{} ready",
150                if ready { "" } else { " not" }
151            ),
152            observed_generation: None,
153            reason: "DeploymentStatus".to_string(),
154        }];
155        let mut new_console = console.clone();
156        new_console.status = Some(status);
157
158        console_api
159            .replace_status(
160                &console.name_unchecked(),
161                &PostParams::default(),
162                serde_json::to_vec(&new_console).unwrap(),
163            )
164            .await?;
165
166        Ok(())
167    }
168
169    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
170        let mut network_policies = Vec::new();
171        if self.config.network_policies_ingress_enabled {
172            let console_label_selector = LabelSelector {
173                match_labels: Some(
174                    console
175                        .default_labels()
176                        .into_iter()
177                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
178                        .collect(),
179                ),
180                ..Default::default()
181            };
182            network_policies.extend([NetworkPolicy {
183                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
184                spec: Some(NetworkPolicySpec {
185                    ingress: Some(vec![NetworkPolicyIngressRule {
186                        from: Some(
187                            self.config
188                                .network_policies_ingress_cidrs
189                                .iter()
190                                .map(|cidr| NetworkPolicyPeer {
191                                    ip_block: Some(IPBlock {
192                                        cidr: cidr.to_owned(),
193                                        except: None,
194                                    }),
195                                    ..Default::default()
196                                })
197                                .collect(),
198                        ),
199                        ports: Some(vec![NetworkPolicyPort {
200                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
201                            protocol: Some("TCP".to_string()),
202                            ..Default::default()
203                        }]),
204                        ..Default::default()
205                    }]),
206                    pod_selector: Some(console_label_selector),
207                    policy_types: Some(vec!["Ingress".to_owned()]),
208                    ..Default::default()
209                }),
210            }]);
211        }
212        network_policies
213    }
214
215    fn create_console_external_certificate(&self, console: &Console) -> Option<Certificate> {
216        create_certificate(
217            self.config
218                .default_certificate_specs
219                .console_external
220                .clone(),
221            console,
222            console.spec.external_certificate_spec.clone(),
223            console.external_certificate_name(),
224            console.external_certificate_secret_name(),
225            None,
226            CertificatePrivateKeyAlgorithm::Rsa,
227            Some(4096),
228        )
229    }
230
231    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
232        let version: String = console
233            .spec
234            .console_image_ref
235            .rsplitn(2, ':')
236            .next()
237            .expect("at least one chunk, even if empty")
238            .to_owned();
239        let app_config_json = serde_json::to_string(&AppConfig {
240            version,
241            auth: AppConfigAuth {
242                mode: console.spec.authenticator_kind,
243            },
244        })
245        .expect("known valid");
246        ConfigMap {
247            binary_data: None,
248            data: Some(btreemap! {
249                "app-config.json".to_owned() => app_config_json,
250            }),
251            immutable: None,
252            metadata: console.managed_resource_meta(console.configmap_name()),
253        }
254    }
255
256    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
257        let mut pod_template_labels = console.default_labels();
258        pod_template_labels.insert(
259            "materialize.cloud/name".to_owned(),
260            console.deployment_name(),
261        );
262        pod_template_labels.insert("app".to_owned(), "console".to_string());
263        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
264
265        let ports = vec![ContainerPort {
266            container_port: self.config.console_http_port.into(),
267            name: Some("http".into()),
268            protocol: Some("TCP".into()),
269            ..Default::default()
270        }];
271
272        let scheme = match console.spec.balancerd.scheme {
273            HttpConnectionScheme::Http => "http",
274            HttpConnectionScheme::Https => "https",
275        };
276        let mut env = vec![EnvVar {
277            name: "MZ_ENDPOINT".to_string(),
278            value: Some(format!(
279                "{}://{}.{}.svc.cluster.local:{}",
280                scheme,
281                console.spec.balancerd.service_name,
282                console.spec.balancerd.namespace,
283                self.config.balancerd_http_port,
284            )),
285            ..Default::default()
286        }];
287        let mut volumes = vec![Volume {
288            name: "app-config".to_string(),
289            config_map: Some(ConfigMapVolumeSource {
290                name: console.configmap_name(),
291                default_mode: Some(256),
292                optional: Some(false),
293                items: Some(vec![KeyToPath {
294                    key: "app-config.json".to_string(),
295                    path: "app-config.json".to_string(),
296                    ..Default::default()
297                }]),
298            }),
299            ..Default::default()
300        }];
301        let mut volume_mounts = vec![VolumeMount {
302            name: "app-config".to_string(),
303            mount_path: "/usr/share/nginx/html/app-config".to_string(),
304            ..Default::default()
305        }];
306
307        let scheme = if issuer_ref_defined(
308            &self.config.default_certificate_specs.console_external,
309            &console.spec.external_certificate_spec,
310        ) {
311            volumes.push(Volume {
312                name: "external-certificate".to_owned(),
313                secret: Some(SecretVolumeSource {
314                    default_mode: Some(0o400),
315                    secret_name: Some(console.external_certificate_secret_name()),
316                    items: None,
317                    optional: Some(false),
318                }),
319                ..Default::default()
320            });
321            volume_mounts.push(VolumeMount {
322                name: "external-certificate".to_owned(),
323                mount_path: "/nginx/tls".to_owned(),
324                read_only: Some(true),
325                ..Default::default()
326            });
327            env.push(EnvVar {
328                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
329                value: Some(format!(
330                    "listen {} ssl;
331ssl_certificate /nginx/tls/tls.crt;
332ssl_certificate_key /nginx/tls/tls.key;",
333                    self.config.console_http_port
334                )),
335                ..Default::default()
336            });
337            Some("HTTPS".to_owned())
338        } else {
339            env.push(EnvVar {
340                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
341                value: Some(format!("listen {};", self.config.console_http_port)),
342                ..Default::default()
343            });
344            Some("HTTP".to_owned())
345        };
346
347        let probe = Probe {
348            http_get: Some(HTTPGetAction {
349                path: Some("/".to_string()),
350                port: IntOrString::Int(self.config.console_http_port.into()),
351                scheme,
352                ..Default::default()
353            }),
354            ..Default::default()
355        };
356
357        let security_context = if self.config.enable_security_context {
358            // Since we want to adhere to the most restrictive security context, all
359            // of these fields have to be set how they are.
360            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
361            Some(SecurityContext {
362                run_as_non_root: Some(true),
363                capabilities: Some(Capabilities {
364                    drop: Some(vec!["ALL".to_string()]),
365                    ..Default::default()
366                }),
367                seccomp_profile: Some(SeccompProfile {
368                    type_: "RuntimeDefault".to_string(),
369                    ..Default::default()
370                }),
371                allow_privilege_escalation: Some(false),
372                ..Default::default()
373            })
374        } else {
375            None
376        };
377
378        let container = Container {
379            name: "console".to_owned(),
380            image: Some(console.spec.console_image_ref.clone()),
381            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
382            ports: Some(ports),
383            env: Some(env),
384            startup_probe: Some(Probe {
385                period_seconds: Some(1),
386                failure_threshold: Some(10),
387                ..probe.clone()
388            }),
389            readiness_probe: Some(Probe {
390                period_seconds: Some(30),
391                failure_threshold: Some(1),
392                ..probe.clone()
393            }),
394            liveness_probe: Some(Probe {
395                period_seconds: Some(30),
396                ..probe.clone()
397            }),
398            resources: console
399                .spec
400                .resource_requirements
401                .clone()
402                .or_else(|| self.config.console_default_resources.clone()),
403            security_context,
404            volume_mounts: Some(volume_mounts),
405            ..Default::default()
406        };
407
408        let deployment_spec = DeploymentSpec {
409            replicas: Some(console.replicas()),
410            selector: LabelSelector {
411                match_labels: Some(pod_template_labels.clone()),
412                ..Default::default()
413            },
414            template: PodTemplateSpec {
415                // not using managed_resource_meta because the pod should be
416                // owned by the deployment, not the materialize instance
417                metadata: Some(ObjectMeta {
418                    labels: Some(pod_template_labels),
419                    ..Default::default()
420                }),
421                spec: Some(PodSpec {
422                    containers: vec![container],
423                    node_selector: Some(
424                        self.config
425                            .console_node_selector
426                            .iter()
427                            .map(|selector| (selector.key.clone(), selector.value.clone()))
428                            .collect(),
429                    ),
430                    affinity: self.config.console_affinity.clone(),
431                    tolerations: self.config.console_tolerations.clone(),
432                    scheduler_name: self.config.scheduler_name.clone(),
433                    volumes: Some(volumes),
434                    security_context: Some(PodSecurityContext {
435                        fs_group: Some(101),
436                        ..Default::default()
437                    }),
438                    ..Default::default()
439                }),
440            },
441            ..Default::default()
442        };
443
444        Deployment {
445            metadata: ObjectMeta {
446                ..console.managed_resource_meta(console.deployment_name())
447            },
448            spec: Some(deployment_spec),
449            status: None,
450        }
451    }
452
453    fn create_console_service_object(&self, console: &Console) -> Service {
454        let selector =
455            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
456
457        let ports = vec![ServicePort {
458            name: Some("http".to_string()),
459            protocol: Some("TCP".to_string()),
460            port: self.config.console_http_port.into(),
461            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
462            ..Default::default()
463        }];
464
465        let spec = ServiceSpec {
466            type_: Some("ClusterIP".to_string()),
467            cluster_ip: Some("None".to_string()),
468            selector: Some(selector),
469            ports: Some(ports),
470            ..Default::default()
471        };
472
473        Service {
474            metadata: console.managed_resource_meta(console.service_name()),
475            spec: Some(spec),
476            status: None,
477        }
478    }
479
480    // TODO: remove this once everyone is upgraded to an orchestratord
481    // version with the separate console operator
482    async fn fix_deployment(
483        &self,
484        deployment_api: &Api<Deployment>,
485        new_deployment: &Deployment,
486    ) -> Result<(), Error> {
487        let Some(mut existing_deployment) = self
488            .deployments
489            .get(
490                &ObjectRef::new(&new_deployment.name_unchecked())
491                    .within(&new_deployment.namespace().unwrap()),
492            )
493            .map(Arc::unwrap_or_clone)
494        else {
495            return Ok(());
496        };
497
498        if existing_deployment.spec.as_ref().unwrap().selector
499            == new_deployment.spec.as_ref().unwrap().selector
500        {
501            return Ok(());
502        }
503
504        warn!("found existing deployment with old label selector, fixing");
505
506        // this is sufficient because the new labels are a superset of the
507        // old labels, so the existing label selector should still be valid
508        existing_deployment
509            .spec
510            .as_mut()
511            .unwrap()
512            .template
513            .metadata
514            .as_mut()
515            .unwrap()
516            .labels = new_deployment
517            .spec
518            .as_ref()
519            .unwrap()
520            .template
521            .metadata
522            .as_ref()
523            .unwrap()
524            .labels
525            .clone();
526
527        // using await_condition is not ideal in a controller loop, but this
528        // is very temporary and will only ever happen once, so this feels
529        // simpler than trying to introduce an entire state machine here
530        replace_resource(deployment_api, &existing_deployment).await?;
531        await_condition(
532            deployment_api.clone(),
533            &existing_deployment.name_unchecked(),
534            |deployment: Option<&Deployment>| {
535                let observed_generation = deployment
536                    .and_then(|deployment| deployment.status.as_ref())
537                    .and_then(|status| status.observed_generation)
538                    .unwrap_or(0);
539                let current_generation = deployment
540                    .and_then(|deployment| deployment.meta().generation)
541                    .unwrap_or(0);
542                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
543                observed_generation == current_generation
544                    && current_generation > previous_generation
545            },
546        )
547        .await
548        .map_err(|e| anyhow::anyhow!(e))?;
549        await_condition(
550            deployment_api.clone(),
551            &existing_deployment.name_unchecked(),
552            is_deployment_completed(),
553        )
554        .await
555        .map_err(|e| anyhow::anyhow!(e))?;
556
557        // delete the deployment but leave the pods around (via
558        // DeleteParams::orphan)
559        match kube::runtime::wait::delete::delete_and_finalize(
560            deployment_api.clone(),
561            &existing_deployment.name_unchecked(),
562            &DeleteParams::orphan(),
563        )
564        .await
565        {
566            Ok(_) => {}
567            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
568                if e.code == 404 =>
569            {
570                // the resource already doesn't exist
571            }
572            Err(e) => return Err(anyhow::anyhow!(e).into()),
573        }
574
575        // now, the normal apply of the new deployment (in the main loop)
576        // will take over the existing pods from the old deployment we just
577        // deleted, since we already updated the pod labels to be the same as
578        // the new label selector
579
580        Ok(())
581    }
582}
583
584#[async_trait::async_trait]
585impl k8s_controller::Context for Context {
586    type Resource = Console;
587    type Error = Error;
588
589    #[instrument(fields())]
590    async fn apply(
591        &self,
592        client: Client,
593        console: &Self::Resource,
594    ) -> Result<Option<Action>, Self::Error> {
595        if console.status.is_none() {
596            let console_api: Api<Console> =
597                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
598            let mut new_console = console.clone();
599            new_console.status = Some(console.status());
600            console_api
601                .replace_status(
602                    &console.name_unchecked(),
603                    &PostParams::default(),
604                    serde_json::to_vec(&new_console).unwrap(),
605                )
606                .await?;
607            // Updating the status should trigger a reconciliation
608            // which will include a status this time.
609            return Ok(None);
610        }
611
612        let namespace = console.namespace();
613        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
614        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
615        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
616        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
617        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
618
619        trace!("creating new network policies");
620        let network_policies = self.create_network_policies(console);
621        for network_policy in &network_policies {
622            apply_resource(&network_policy_api, network_policy).await?;
623        }
624
625        trace!("creating new console configmap");
626        let console_configmap = self.create_console_app_configmap_object(console);
627        apply_resource(&configmap_api, &console_configmap).await?;
628
629        trace!("creating new console deployment");
630        let console_deployment = self.create_console_deployment_object(console);
631        self.fix_deployment(&deployment_api, &console_deployment)
632            .await?;
633        apply_resource(&deployment_api, &console_deployment).await?;
634
635        trace!("creating new console service");
636        let console_service = self.create_console_service_object(console);
637        apply_resource(&service_api, &console_service).await?;
638
639        let console_external_certificate = self.create_console_external_certificate(console);
640        if let Some(certificate) = &console_external_certificate {
641            trace!("creating new console external certificate");
642            apply_resource(&certificate_api, certificate).await?;
643        }
644
645        self.sync_deployment_status(&client, console).await?;
646
647        Ok(None)
648    }
649}