Skip to main content

mz_orchestratord/controller/
console.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use k8s_controller::TraceMetadata;
11use k8s_openapi::{
12    api::{
13        apps::v1::{Deployment, DeploymentSpec},
14        core::v1::{
15            Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
16            EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
17            ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18            ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
19        },
20        networking::v1::{
21            IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
22            NetworkPolicySpec,
23        },
24    },
25    apimachinery::pkg::{
26        apis::meta::v1::{Condition, LabelSelector, Time},
27        util::intstr::IntOrString,
28    },
29    jiff::Timestamp,
30};
31use kube::{
32    Api, Client, Resource, ResourceExt,
33    api::{DeleteParams, ObjectMeta, PostParams},
34    runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
35};
36use maplit::btreemap;
37use serde::Serialize;
38use tracing::{trace, warn};
39
40use crate::{
41    Error,
42    k8s::{apply_resource, get_resource, replace_resource},
43    tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
44};
45use mz_cloud_resources::crd::{
46    ManagedResource,
47    console::v1alpha1::{Console, HttpConnectionScheme},
48    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
49};
50use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
51use mz_ore::{cli::KeyValueArg, instrument};
52use mz_server_core::listeners::AuthenticatorKind;
53
54pub struct Config {
55    pub enable_security_context: bool,
56    pub enable_prometheus_scrape_annotations: bool,
57
58    pub image_pull_policy: KubernetesImagePullPolicy,
59    pub scheduler_name: Option<String>,
60    pub console_node_selector: Vec<KeyValueArg<String, String>>,
61    pub console_affinity: Option<Affinity>,
62    pub console_tolerations: Option<Vec<Toleration>>,
63    pub console_default_resources: Option<ResourceRequirements>,
64    pub network_policies_ingress_enabled: bool,
65    pub network_policies_ingress_cidrs: Vec<String>,
66
67    pub default_certificate_specs: DefaultCertificateSpecs,
68
69    pub console_http_port: u16,
70    pub balancerd_http_port: u16,
71}
72
73#[derive(Serialize)]
74struct AppConfig {
75    version: String,
76    auth: AppConfigAuth,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    balancerd_dns_names: Option<Vec<String>>,
79}
80
81#[derive(Serialize)]
82struct AppConfigAuth {
83    mode: AuthenticatorKind,
84}
85
86pub struct Context {
87    config: Config,
88}
89
90impl Context {
91    pub fn new(config: Config) -> Self {
92        Self { config }
93    }
94
95    async fn sync_deployment_status(
96        &self,
97        client: &Client,
98        console: &Console,
99    ) -> Result<(), Error> {
100        let namespace = console.namespace();
101        let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
102        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
103
104        let Some(deployment) = get_resource(&deployment_api, &console.deployment_name()).await?
105        else {
106            return Ok(());
107        };
108
109        let Some(deployment_conditions) = &deployment
110            .status
111            .as_ref()
112            .and_then(|status| status.conditions.as_ref())
113        else {
114            // if the deployment doesn't have any conditions set yet, there
115            // is nothing to sync
116            return Ok(());
117        };
118
119        let ready = deployment_conditions
120            .iter()
121            .any(|condition| condition.type_ == "Available" && condition.status == "True");
122        let ready_str = if ready { "True" } else { "False" };
123
124        let mut status = console.status.clone().unwrap();
125        if status
126            .conditions
127            .iter()
128            .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
129        {
130            // if the deployment status is already set correctly, we don't
131            // need to set it again (this prevents us from getting stuck in
132            // a reconcile loop)
133            return Ok(());
134        }
135
136        status.conditions = vec![Condition {
137            type_: "Ready".to_string(),
138            status: ready_str.to_string(),
139            last_transition_time: Time(Timestamp::now()),
140            message: format!(
141                "console deployment is{} ready",
142                if ready { "" } else { " not" }
143            ),
144            observed_generation: None,
145            reason: "DeploymentStatus".to_string(),
146        }];
147        let mut new_console = console.clone();
148        new_console.status = Some(status);
149
150        console_api
151            .replace_status(
152                &console.name_unchecked(),
153                &PostParams::default(),
154                &new_console,
155            )
156            .await?;
157
158        Ok(())
159    }
160
161    fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
162        let mut network_policies = Vec::new();
163        if self.config.network_policies_ingress_enabled {
164            let console_label_selector = LabelSelector {
165                match_labels: Some(
166                    console
167                        .default_labels()
168                        .into_iter()
169                        .chain([("materialize.cloud/app".to_owned(), console.app_name())])
170                        .collect(),
171                ),
172                ..Default::default()
173            };
174            network_policies.extend([NetworkPolicy {
175                metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
176                spec: Some(NetworkPolicySpec {
177                    ingress: Some(vec![NetworkPolicyIngressRule {
178                        from: Some(
179                            self.config
180                                .network_policies_ingress_cidrs
181                                .iter()
182                                .map(|cidr| NetworkPolicyPeer {
183                                    ip_block: Some(IPBlock {
184                                        cidr: cidr.to_owned(),
185                                        except: None,
186                                    }),
187                                    ..Default::default()
188                                })
189                                .collect(),
190                        ),
191                        ports: Some(vec![NetworkPolicyPort {
192                            port: Some(IntOrString::Int(self.config.console_http_port.into())),
193                            protocol: Some("TCP".to_string()),
194                            ..Default::default()
195                        }]),
196                        ..Default::default()
197                    }]),
198                    pod_selector: Some(console_label_selector),
199                    policy_types: Some(vec!["Ingress".to_owned()]),
200                    ..Default::default()
201                }),
202            }]);
203        }
204        network_policies
205    }
206
207    fn create_console_external_certificate(
208        &self,
209        console: &Console,
210    ) -> anyhow::Result<Option<Certificate>> {
211        create_certificate(
212            self.config
213                .default_certificate_specs
214                .console_external
215                .clone(),
216            console,
217            console.spec.external_certificate_spec.clone(),
218            console.external_certificate_name(),
219            console.external_certificate_secret_name(),
220            None,
221            CertificatePrivateKeyAlgorithm::Ecdsa,
222            Some(256),
223        )
224    }
225
226    fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
227        let balancerd_dns_names = console.spec.balancerd.dns_names.clone();
228        let version: String = console
229            .spec
230            .console_image_ref
231            .rsplitn(2, ':')
232            .next()
233            .expect("at least one chunk, even if empty")
234            .to_owned();
235        let app_config_json = serde_json::to_string(&AppConfig {
236            version,
237            balancerd_dns_names,
238            auth: AppConfigAuth {
239                mode: console.spec.authenticator_kind,
240            },
241        })
242        .expect("known valid");
243        ConfigMap {
244            binary_data: None,
245            data: Some(btreemap! {
246                "app-config.json".to_owned() => app_config_json,
247            }),
248            immutable: None,
249            metadata: console.managed_resource_meta(console.configmap_name()),
250        }
251    }
252
253    fn create_console_deployment_object(&self, console: &Console) -> Deployment {
254        let mut pod_template_labels = console.default_labels();
255        pod_template_labels.insert(
256            "materialize.cloud/name".to_owned(),
257            console.deployment_name(),
258        );
259        pod_template_labels.insert("app".to_owned(), "console".to_string());
260        pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
261
262        let ports = vec![ContainerPort {
263            container_port: self.config.console_http_port.into(),
264            name: Some("http".into()),
265            protocol: Some("TCP".into()),
266            ..Default::default()
267        }];
268
269        let scheme = match console.spec.balancerd.scheme {
270            HttpConnectionScheme::Http => "http",
271            HttpConnectionScheme::Https => "https",
272        };
273        let mut env = vec![EnvVar {
274            name: "MZ_ENDPOINT".to_string(),
275            value: Some(format!(
276                "{}://{}.{}.svc.cluster.local:{}",
277                scheme,
278                console.spec.balancerd.service_name,
279                console.spec.balancerd.namespace,
280                self.config.balancerd_http_port,
281            )),
282            ..Default::default()
283        }];
284        let mut volumes = vec![Volume {
285            name: "app-config".to_string(),
286            config_map: Some(ConfigMapVolumeSource {
287                name: console.configmap_name(),
288                default_mode: Some(256),
289                optional: Some(false),
290                items: Some(vec![KeyToPath {
291                    key: "app-config.json".to_string(),
292                    path: "app-config.json".to_string(),
293                    ..Default::default()
294                }]),
295            }),
296            ..Default::default()
297        }];
298        let mut volume_mounts = vec![VolumeMount {
299            name: "app-config".to_string(),
300            mount_path: "/usr/share/nginx/html/app-config".to_string(),
301            ..Default::default()
302        }];
303
304        let scheme = if issuer_ref_defined(
305            &self.config.default_certificate_specs.console_external,
306            &console.spec.external_certificate_spec,
307        ) {
308            volumes.push(Volume {
309                name: "external-certificate".to_owned(),
310                secret: Some(SecretVolumeSource {
311                    default_mode: Some(0o400),
312                    secret_name: Some(console.external_certificate_secret_name()),
313                    items: None,
314                    optional: Some(false),
315                }),
316                ..Default::default()
317            });
318            volume_mounts.push(VolumeMount {
319                name: "external-certificate".to_owned(),
320                mount_path: "/nginx/tls".to_owned(),
321                read_only: Some(true),
322                ..Default::default()
323            });
324            env.push(EnvVar {
325                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
326                value: Some(format!(
327                    "listen {} ssl;
328ssl_certificate /nginx/tls/tls.crt;
329ssl_certificate_key /nginx/tls/tls.key;",
330                    self.config.console_http_port
331                )),
332                ..Default::default()
333            });
334            Some("HTTPS".to_owned())
335        } else {
336            env.push(EnvVar {
337                name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
338                value: Some(format!("listen {};", self.config.console_http_port)),
339                ..Default::default()
340            });
341            Some("HTTP".to_owned())
342        };
343
344        let probe = Probe {
345            http_get: Some(HTTPGetAction {
346                path: Some("/".to_string()),
347                port: IntOrString::Int(self.config.console_http_port.into()),
348                scheme,
349                ..Default::default()
350            }),
351            ..Default::default()
352        };
353
354        let security_context = if self.config.enable_security_context {
355            // Since we want to adhere to the most restrictive security context, all
356            // of these fields have to be set how they are.
357            // See https://kubernetes.io/docs/concepts/security/pod-security-standards/#restricted
358            Some(SecurityContext {
359                run_as_non_root: Some(true),
360                capabilities: Some(Capabilities {
361                    drop: Some(vec!["ALL".to_string()]),
362                    ..Default::default()
363                }),
364                seccomp_profile: Some(SeccompProfile {
365                    type_: "RuntimeDefault".to_string(),
366                    ..Default::default()
367                }),
368                allow_privilege_escalation: Some(false),
369                ..Default::default()
370            })
371        } else {
372            None
373        };
374
375        let container = Container {
376            name: "console".to_owned(),
377            image: Some(console.spec.console_image_ref.clone()),
378            image_pull_policy: Some(self.config.image_pull_policy.to_string()),
379            ports: Some(ports),
380            env: Some(env),
381            startup_probe: Some(Probe {
382                period_seconds: Some(1),
383                failure_threshold: Some(10),
384                ..probe.clone()
385            }),
386            readiness_probe: Some(Probe {
387                period_seconds: Some(30),
388                failure_threshold: Some(1),
389                ..probe.clone()
390            }),
391            liveness_probe: Some(Probe {
392                period_seconds: Some(30),
393                ..probe.clone()
394            }),
395            resources: console
396                .spec
397                .resource_requirements
398                .clone()
399                .or_else(|| self.config.console_default_resources.clone()),
400            security_context,
401            volume_mounts: Some(volume_mounts),
402            ..Default::default()
403        };
404
405        let deployment_spec = DeploymentSpec {
406            replicas: Some(console.replicas()),
407            selector: LabelSelector {
408                match_labels: Some(pod_template_labels.clone()),
409                ..Default::default()
410            },
411            template: PodTemplateSpec {
412                // not using managed_resource_meta because the pod should be
413                // owned by the deployment, not the materialize instance
414                metadata: Some(ObjectMeta {
415                    labels: Some(pod_template_labels),
416                    ..Default::default()
417                }),
418                spec: Some(PodSpec {
419                    containers: vec![container],
420                    node_selector: Some(
421                        self.config
422                            .console_node_selector
423                            .iter()
424                            .map(|selector| (selector.key.clone(), selector.value.clone()))
425                            .collect(),
426                    ),
427                    affinity: self.config.console_affinity.clone(),
428                    tolerations: self.config.console_tolerations.clone(),
429                    scheduler_name: self.config.scheduler_name.clone(),
430                    volumes: Some(volumes),
431                    security_context: Some(PodSecurityContext {
432                        fs_group: Some(101),
433                        ..Default::default()
434                    }),
435                    ..Default::default()
436                }),
437            },
438            ..Default::default()
439        };
440
441        Deployment {
442            metadata: ObjectMeta {
443                ..console.managed_resource_meta(console.deployment_name())
444            },
445            spec: Some(deployment_spec),
446            status: None,
447        }
448    }
449
450    fn create_console_service_object(&self, console: &Console) -> Service {
451        let selector =
452            btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
453
454        let ports = vec![ServicePort {
455            name: Some("http".to_string()),
456            protocol: Some("TCP".to_string()),
457            port: self.config.console_http_port.into(),
458            target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
459            ..Default::default()
460        }];
461
462        let spec = ServiceSpec {
463            type_: Some("ClusterIP".to_string()),
464            cluster_ip: Some("None".to_string()),
465            selector: Some(selector),
466            ports: Some(ports),
467            ..Default::default()
468        };
469
470        Service {
471            metadata: console.managed_resource_meta(console.service_name()),
472            spec: Some(spec),
473            status: None,
474        }
475    }
476
477    // TODO: remove this once everyone is upgraded to an orchestratord
478    // version with the separate console operator
479    async fn fix_deployment(
480        &self,
481        deployment_api: &Api<Deployment>,
482        new_deployment: &Deployment,
483    ) -> Result<(), Error> {
484        let Some(mut existing_deployment) =
485            get_resource(deployment_api, &new_deployment.name_unchecked()).await?
486        else {
487            return Ok(());
488        };
489
490        if existing_deployment.spec.as_ref().unwrap().selector
491            == new_deployment.spec.as_ref().unwrap().selector
492        {
493            return Ok(());
494        }
495
496        warn!("found existing deployment with old label selector, fixing");
497
498        // this is sufficient because the new labels are a superset of the
499        // old labels, so the existing label selector should still be valid
500        existing_deployment
501            .spec
502            .as_mut()
503            .unwrap()
504            .template
505            .metadata
506            .as_mut()
507            .unwrap()
508            .labels = new_deployment
509            .spec
510            .as_ref()
511            .unwrap()
512            .template
513            .metadata
514            .as_ref()
515            .unwrap()
516            .labels
517            .clone();
518
519        // using await_condition is not ideal in a controller loop, but this
520        // is very temporary and will only ever happen once, so this feels
521        // simpler than trying to introduce an entire state machine here
522        replace_resource(deployment_api, &existing_deployment).await?;
523        await_condition(
524            deployment_api.clone(),
525            &existing_deployment.name_unchecked(),
526            |deployment: Option<&Deployment>| {
527                let observed_generation = deployment
528                    .and_then(|deployment| deployment.status.as_ref())
529                    .and_then(|status| status.observed_generation)
530                    .unwrap_or(0);
531                let current_generation = deployment
532                    .and_then(|deployment| deployment.meta().generation)
533                    .unwrap_or(0);
534                let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
535                observed_generation == current_generation
536                    && current_generation > previous_generation
537            },
538        )
539        .await
540        .map_err(|e| anyhow::anyhow!(e))?;
541        await_condition(
542            deployment_api.clone(),
543            &existing_deployment.name_unchecked(),
544            is_deployment_completed(),
545        )
546        .await
547        .map_err(|e| anyhow::anyhow!(e))?;
548
549        // delete the deployment but leave the pods around (via
550        // DeleteParams::orphan)
551        match kube::runtime::wait::delete::delete_and_finalize(
552            deployment_api.clone(),
553            &existing_deployment.name_unchecked(),
554            &DeleteParams::orphan(),
555        )
556        .await
557        {
558            Ok(_) => {}
559            Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
560                if e.code == 404 =>
561            {
562                // the resource already doesn't exist
563            }
564            Err(e) => return Err(anyhow::anyhow!(e).into()),
565        }
566
567        // now, the normal apply of the new deployment (in the main loop)
568        // will take over the existing pods from the old deployment we just
569        // deleted, since we already updated the pod labels to be the same as
570        // the new label selector
571
572        Ok(())
573    }
574}
575
576#[async_trait::async_trait]
577impl k8s_controller::Context for Context {
578    type Resource = Console;
579    type Error = Error;
580
581    #[instrument(fields())]
582    async fn apply(
583        &self,
584        client: Client,
585        console: &Self::Resource,
586        _metadata: &mut TraceMetadata,
587    ) -> Result<Option<Action>, Self::Error> {
588        if console.status.is_none() {
589            let console_api: Api<Console> =
590                Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
591            let mut new_console = console.clone();
592            new_console.status = Some(console.status());
593            console_api
594                .replace_status(
595                    &console.name_unchecked(),
596                    &PostParams::default(),
597                    &new_console,
598                )
599                .await?;
600            // Updating the status should trigger a reconciliation
601            // which will include a status this time.
602            return Ok(None);
603        }
604
605        let namespace = console.namespace();
606        let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
607        let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
608        let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
609        let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
610        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
611
612        trace!("creating new network policies");
613        let network_policies = self.create_network_policies(console);
614        for network_policy in &network_policies {
615            apply_resource(&network_policy_api, network_policy).await?;
616        }
617
618        trace!("creating new console configmap");
619        let console_configmap = self.create_console_app_configmap_object(console);
620        apply_resource(&configmap_api, &console_configmap).await?;
621
622        trace!("creating new console deployment");
623        let console_deployment = self.create_console_deployment_object(console);
624        self.fix_deployment(&deployment_api, &console_deployment)
625            .await?;
626        apply_resource(&deployment_api, &console_deployment).await?;
627
628        trace!("creating new console service");
629        let console_service = self.create_console_service_object(console);
630        apply_resource(&service_api, &console_service).await?;
631
632        let console_external_certificate = self.create_console_external_certificate(console)?;
633        if let Some(certificate) = &console_external_certificate {
634            trace!("creating new console external certificate");
635            apply_resource(&certificate_api, certificate).await?;
636        }
637
638        self.sync_deployment_status(&client, console).await?;
639
640        Ok(None)
641    }
642}