mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    str::FromStr,
14    sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::{
19    api::core::v1::{Affinity, Toleration},
20    apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
23use serde::Deserialize;
24use tracing::{debug, trace};
25use uuid::Uuid;
26
27use crate::metrics::Metrics;
28use mz_cloud_provider::CloudProvider;
29use mz_cloud_resources::crd::materialize::v1alpha1::{
30    Materialize, MaterializeCertSpec, MaterializeStatus,
31};
32use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
33use mz_orchestrator_tracing::TracingCliArgs;
34use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
35
36pub mod balancer;
37pub mod console;
38pub mod environmentd;
39pub mod tls;
40
41#[derive(clap::Parser)]
42pub struct MaterializeControllerArgs {
43    #[clap(long)]
44    cloud_provider: CloudProvider,
45    #[clap(long)]
46    region: String,
47    #[clap(long)]
48    create_balancers: bool,
49    #[clap(long)]
50    create_console: bool,
51    #[clap(long)]
52    helm_chart_version: Option<String>,
53    #[clap(long, default_value = "kubernetes")]
54    secrets_controller: String,
55    #[clap(long)]
56    collect_pod_metrics: bool,
57    #[clap(long)]
58    enable_prometheus_scrape_annotations: bool,
59    #[clap(long)]
60    disable_authentication: bool,
61
62    #[clap(long)]
63    segment_api_key: Option<String>,
64    #[clap(long)]
65    segment_client_side: bool,
66
67    #[clap(long)]
68    console_image_tag_default: String,
69    #[clap(long)]
70    console_image_tag_map: Vec<KeyValueArg<String, String>>,
71
72    #[clap(flatten)]
73    aws_info: AwsInfo,
74
75    #[clap(long)]
76    ephemeral_volume_class: Option<String>,
77    #[clap(long)]
78    scheduler_name: Option<String>,
79    #[clap(long)]
80    enable_security_context: bool,
81    #[clap(long)]
82    enable_internal_statement_logging: bool,
83    #[clap(long, default_value = "false")]
84    disable_statement_logging: bool,
85
86    #[clap(long)]
87    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
88    #[clap(long)]
89    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
90    #[clap(long, value_parser = parse_affinity)]
91    environmentd_affinity: Option<Affinity>,
92    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
93    environmentd_tolerations: Option<Vec<Toleration>>,
94    #[clap(long)]
95    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
96    #[clap(long, value_parser = parse_affinity)]
97    clusterd_affinity: Option<Affinity>,
98    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
99    clusterd_tolerations: Option<Vec<Toleration>>,
100    #[clap(long)]
101    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
102    #[clap(long, value_parser = parse_affinity)]
103    balancerd_affinity: Option<Affinity>,
104    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
105    balancerd_tolerations: Option<Vec<Toleration>>,
106    #[clap(long)]
107    console_node_selector: Vec<KeyValueArg<String, String>>,
108    #[clap(long, value_parser = parse_affinity)]
109    console_affinity: Option<Affinity>,
110    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
111    console_tolerations: Option<Vec<Toleration>>,
112    #[clap(long, default_value = "always", value_enum)]
113    image_pull_policy: KubernetesImagePullPolicy,
114    #[clap(flatten)]
115    network_policies: NetworkPolicyConfig,
116
117    #[clap(long)]
118    environmentd_cluster_replica_sizes: Option<String>,
119    #[clap(long)]
120    bootstrap_default_cluster_replica_size: Option<String>,
121    #[clap(long)]
122    bootstrap_builtin_system_cluster_replica_size: Option<String>,
123    #[clap(long)]
124    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
125    #[clap(long)]
126    bootstrap_builtin_support_cluster_replica_size: Option<String>,
127    #[clap(long)]
128    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
129    #[clap(long)]
130    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
131    #[clap(long)]
132    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
133    #[clap(long)]
134    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
135    #[clap(long)]
136    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
137    #[clap(long)]
138    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
139
140    #[clap(
141        long,
142        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
143    )]
144    environmentd_allowed_origins: Vec<HeaderValue>,
145    #[clap(long, default_value = "https://console.materialize.com")]
146    internal_console_proxy_url: String,
147
148    #[clap(long, default_value = "6875")]
149    environmentd_sql_port: u16,
150    #[clap(long, default_value = "6876")]
151    environmentd_http_port: u16,
152    #[clap(long, default_value = "6877")]
153    environmentd_internal_sql_port: u16,
154    #[clap(long, default_value = "6878")]
155    environmentd_internal_http_port: u16,
156    #[clap(long, default_value = "6879")]
157    environmentd_internal_persist_pubsub_port: u16,
158
159    #[clap(long, default_value = "6875")]
160    balancerd_sql_port: u16,
161    #[clap(long, default_value = "6876")]
162    balancerd_http_port: u16,
163    #[clap(long, default_value = "8080")]
164    balancerd_internal_http_port: u16,
165
166    #[clap(long, default_value = "8080")]
167    console_http_port: u16,
168
169    #[clap(long, default_value = "{}")]
170    default_certificate_specs: DefaultCertificateSpecs,
171
172    #[clap(long, hide = true)]
173    disable_license_key_checks: bool,
174}
175
176fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
177    Ok(serde_json::from_str(s)?)
178}
179
180fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
181    Ok(serde_json::from_str(s)?)
182}
183
184#[derive(Clone, Deserialize, Default)]
185#[serde(rename_all = "camelCase")]
186pub struct DefaultCertificateSpecs {
187    balancerd_external: Option<MaterializeCertSpec>,
188    console_external: Option<MaterializeCertSpec>,
189    internal: Option<MaterializeCertSpec>,
190}
191
192impl FromStr for DefaultCertificateSpecs {
193    type Err = serde_json::Error;
194
195    fn from_str(s: &str) -> Result<Self, Self::Err> {
196        serde_json::from_str(s)
197    }
198}
199
200#[derive(clap::Parser)]
201pub struct AwsInfo {
202    #[clap(long)]
203    aws_account_id: Option<String>,
204    #[clap(long)]
205    environmentd_iam_role_arn: Option<String>,
206    #[clap(long)]
207    environmentd_connection_role_arn: Option<String>,
208    #[clap(long)]
209    aws_secrets_controller_tags: Vec<String>,
210    #[clap(long)]
211    environmentd_availability_zones: Option<Vec<String>>,
212}
213
214#[derive(clap::Parser)]
215pub struct NetworkPolicyConfig {
216    #[clap(long = "network-policies-internal-enabled")]
217    internal_enabled: bool,
218
219    #[clap(long = "network-policies-ingress-enabled")]
220    ingress_enabled: bool,
221
222    #[clap(long = "network-policies-ingress-cidrs")]
223    ingress_cidrs: Vec<String>,
224
225    #[clap(long = "network-policies-egress-enabled")]
226    egress_enabled: bool,
227
228    #[clap(long = "network-policies-egress-cidrs")]
229    egress_cidrs: Vec<String>,
230}
231
232#[derive(Debug, thiserror::Error)]
233pub enum Error {
234    Anyhow(#[from] anyhow::Error),
235    Kube(#[from] kube::Error),
236}
237
238impl Display for Error {
239    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240        match self {
241            Self::Anyhow(e) => write!(f, "{e}"),
242            Self::Kube(e) => write!(f, "{e}"),
243        }
244    }
245}
246
247pub struct Context {
248    config: MaterializeControllerArgs,
249    tracing: TracingCliArgs,
250    orchestratord_namespace: String,
251    metrics: Arc<Metrics>,
252    needs_update: Arc<Mutex<BTreeSet<String>>>,
253}
254
255impl Context {
256    pub fn new(
257        config: MaterializeControllerArgs,
258        tracing: TracingCliArgs,
259        orchestratord_namespace: String,
260        metrics: Arc<Metrics>,
261    ) -> Self {
262        if config.cloud_provider == CloudProvider::Aws {
263            assert!(
264                config.aws_info.aws_account_id.is_some(),
265                "--aws-account-id is required when using --cloud-provider=aws"
266            );
267        }
268
269        Self {
270            config,
271            tracing,
272            orchestratord_namespace,
273            metrics,
274            needs_update: Default::default(),
275        }
276    }
277
278    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
279        let mut needs_update_set = self.needs_update.lock().unwrap();
280        if needs_update {
281            needs_update_set.insert(mz.name_unchecked());
282        } else {
283            needs_update_set.remove(&mz.name_unchecked());
284        }
285        self.metrics
286            .environmentd_needs_update
287            .set(u64::cast_from(needs_update_set.len()));
288    }
289
290    async fn update_status(
291        &self,
292        mz_api: &Api<Materialize>,
293        mz: &Materialize,
294        status: MaterializeStatus,
295        needs_update: bool,
296    ) -> Result<Materialize, kube::Error> {
297        self.set_needs_update(mz, needs_update);
298
299        let mut new_mz = mz.clone();
300        if !mz
301            .status
302            .as_ref()
303            .map_or(true, |mz_status| mz_status.needs_update(&status))
304        {
305            return Ok(new_mz);
306        }
307
308        new_mz.status = Some(status);
309        mz_api
310            .replace_status(
311                &mz.name_unchecked(),
312                &PostParams::default(),
313                serde_json::to_vec(&new_mz).unwrap(),
314            )
315            .await
316    }
317}
318
319#[async_trait::async_trait]
320impl k8s_controller::Context for Context {
321    type Resource = Materialize;
322    type Error = Error;
323
324    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
325
326    #[instrument(fields(organization_name=mz.name_unchecked()))]
327    async fn apply(
328        &self,
329        client: Client,
330        mz: &Self::Resource,
331    ) -> Result<Option<Action>, Self::Error> {
332        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
333
334        let status = mz.status();
335        if mz.status.is_none() {
336            self.update_status(&mz_api, mz, status, true).await?;
337            // Updating the status should trigger a reconciliation
338            // which will include a status this time.
339            return Ok(None);
340        }
341
342        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
343            let mut mz = mz.clone();
344            if mz.spec.request_rollout.is_nil() {
345                mz.spec.request_rollout = Uuid::new_v4();
346            }
347            if mz.spec.environment_id.is_nil() {
348                mz.spec.environment_id = Uuid::new_v4();
349            }
350            mz_api
351                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
352                .await?;
353            // Updating the spec should also trigger a reconciliation.
354            // We can't do that as part of the above check because you can't
355            // update both the spec and the status in a single api call.
356            return Ok(None);
357        }
358
359        // we compare the hash against the environment resources generated
360        // for the current active generation, since that's what we expect to
361        // have been applied earlier, but we don't want to use these
362        // environment resources because when we apply them, we want to apply
363        // them with data that uses the new generation
364        let active_resources = environmentd::Resources::new(
365            &self.config,
366            &self.tracing,
367            &self.orchestratord_namespace,
368            mz,
369            status.active_generation,
370        );
371        let has_current_changes = status.resources_hash != active_resources.generate_hash();
372        let active_generation = status.active_generation;
373        let next_generation = active_generation + 1;
374        let increment_generation = has_current_changes && !mz.in_place_rollout();
375        let desired_generation = if increment_generation {
376            next_generation
377        } else {
378            active_generation
379        };
380
381        // here we regenerate the environment resources using the
382        // same inputs except with an updated generation
383        let resources = environmentd::Resources::new(
384            &self.config,
385            &self.tracing,
386            &self.orchestratord_namespace,
387            mz,
388            desired_generation,
389        );
390        let resources_hash = resources.generate_hash();
391
392        let mut result = if has_current_changes {
393            if mz.rollout_requested() {
394                // we remove the environment resources hash annotation here
395                // because if we fail halfway through applying the resources,
396                // things will be in an inconsistent state, and we don't want
397                // to allow the possibility of the user making a second
398                // change which reverts to the original state and then
399                // skipping retrying this apply, since that would leave
400                // things in a permanently inconsistent state.
401                // note that environment.spec will be empty here after
402                // replace_status, but this is fine because we already
403                // extracted all of the information we want from the spec
404                // earlier.
405                let mz = self
406                    .update_status(
407                        &mz_api,
408                        mz,
409                        MaterializeStatus {
410                            active_generation,
411                            // don't update the reconciliation id yet,
412                            // because the rollout hasn't yet completed. if
413                            // we fail later on, we want to ensure that the
414                            // rollout gets retried.
415                            last_completed_rollout_request: status.last_completed_rollout_request,
416                            resource_id: status.resource_id,
417                            resources_hash: String::new(),
418                            conditions: vec![Condition {
419                                type_: "UpToDate".into(),
420                                status: "Unknown".into(),
421                                last_transition_time: Time(chrono::offset::Utc::now()),
422                                message: format!(
423                                    "Applying changes for generation {desired_generation}"
424                                ),
425                                observed_generation: mz.meta().generation,
426                                reason: "Applying".into(),
427                            }],
428                        },
429                        active_generation != desired_generation,
430                    )
431                    .await?;
432                let mz = &mz;
433                let status = mz.status();
434
435                trace!("applying environment resources");
436                match resources
437                    .apply(
438                        &client,
439                        increment_generation,
440                        mz.should_force_promote(),
441                        &mz.namespace(),
442                    )
443                    .await
444                {
445                    Ok(Some(action)) => {
446                        trace!("new environment is not yet ready");
447                        Ok(Some(action))
448                    }
449                    Ok(None) => {
450                        // do this last, so that we keep traffic pointing at
451                        // the previous environmentd until the new one is
452                        // fully ready
453                        resources.promote_services(&client, &mz.namespace()).await?;
454                        if increment_generation {
455                            resources
456                                .teardown_generation(&client, mz, active_generation)
457                                .await?;
458                        }
459                        self.update_status(
460                            &mz_api,
461                            mz,
462                            MaterializeStatus {
463                                active_generation: desired_generation,
464                                last_completed_rollout_request: mz.requested_reconciliation_id(),
465                                resource_id: status.resource_id,
466                                resources_hash,
467                                conditions: vec![Condition {
468                                    type_: "UpToDate".into(),
469                                    status: "True".into(),
470                                    last_transition_time: Time(chrono::offset::Utc::now()),
471                                    message: format!(
472                                        "Successfully applied changes for generation {desired_generation}"
473                                    ),
474                                    observed_generation: mz.meta().generation,
475                                    reason: "Applied".into(),
476                                }],
477                            },
478                            false,
479                        )
480                        .await?;
481                        Ok(None)
482                    }
483                    Err(e) => {
484                        // TODO should we actually tear things down here?
485                        // I don't think we should. This might have some weird behavior if we hit
486                        // transient errors. We might end up tearing down many hours of progress on
487                        // rehydration due to a transient failure to reapply an already working K8S
488                        // object.
489                        resources
490                            .teardown_generation(&client, mz, next_generation)
491                            .await?;
492                        self.update_status(
493                            &mz_api,
494                            mz,
495                            MaterializeStatus {
496                                active_generation,
497                                // also don't update the reconciliation id
498                                // here, because there was an error during
499                                // the rollout and we want to ensure it gets
500                                // retried.
501                                last_completed_rollout_request: status.last_completed_rollout_request,
502                                resource_id: status.resource_id,
503                                resources_hash: status.resources_hash,
504                                conditions: vec![Condition {
505                                    type_: "UpToDate".into(),
506                                    status: "False".into(),
507                                    last_transition_time: Time(chrono::offset::Utc::now()),
508                                    message: format!(
509                                        "Failed to apply changes for generation {desired_generation}: {e}"
510                                    ),
511                                    observed_generation: mz.meta().generation,
512                                    reason: "FailedDeploy".into(),
513                                }],
514                            },
515                            active_generation != desired_generation,
516                        )
517                        .await?;
518                        Err(e)
519                    }
520                }
521            } else {
522                let mut needs_update = mz.conditions_need_update();
523                if mz.update_in_progress() {
524                    resources
525                        .teardown_generation(&client, mz, next_generation)
526                        .await?;
527                    needs_update = true;
528                }
529                if needs_update {
530                    self.update_status(
531                        &mz_api,
532                        mz,
533                        MaterializeStatus {
534                            active_generation,
535                            last_completed_rollout_request: mz.requested_reconciliation_id(),
536                            resource_id: status.resource_id,
537                            resources_hash: status.resources_hash,
538                            conditions: vec![Condition {
539                                type_: "UpToDate".into(),
540                                status: "False".into(),
541                                last_transition_time: Time(chrono::offset::Utc::now()),
542                                message: format!(
543                                    "Changes detected, waiting for approval for generation {desired_generation}"
544                                ),
545                                observed_generation: mz.meta().generation,
546                                reason: "WaitingForApproval".into(),
547                            }],
548                        },
549                        active_generation != desired_generation,
550                    )
551                    .await?;
552                }
553                debug!("changes detected, waiting for approval");
554                Ok(None)
555            }
556        } else {
557            // this can happen if we update the environment, but then revert
558            // that update before the update was deployed. in this case, we
559            // don't want the environment to still show up as
560            // WaitingForApproval.
561            let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
562            if mz.update_in_progress() {
563                resources
564                    .teardown_generation(&client, mz, next_generation)
565                    .await?;
566                needs_update = true;
567            }
568            if needs_update {
569                self.update_status(
570                    &mz_api,
571                    mz,
572                    MaterializeStatus {
573                        active_generation,
574                        last_completed_rollout_request: mz.requested_reconciliation_id(),
575                        resource_id: status.resource_id,
576                        resources_hash: status.resources_hash,
577                        conditions: vec![Condition {
578                            type_: "UpToDate".into(),
579                            status: "True".into(),
580                            last_transition_time: Time(chrono::offset::Utc::now()),
581                            message: format!(
582                                "No changes found from generation {active_generation}"
583                            ),
584                            observed_generation: mz.meta().generation,
585                            reason: "Applied".into(),
586                        }],
587                    },
588                    active_generation != desired_generation,
589                )
590                .await?;
591            }
592            debug!("no changes");
593            Ok(None)
594        };
595
596        // balancers rely on the environmentd service existing, which is
597        // enforced by the environmentd rollout process being able to call
598        // into the promotion endpoint
599
600        if !matches!(result, Ok(None)) {
601            return result.map_err(Error::Anyhow);
602        }
603
604        let balancer = balancer::Resources::new(&self.config, mz);
605        if self.config.create_balancers {
606            result = balancer.apply(&client, &mz.namespace()).await;
607        } else {
608            result = balancer.cleanup(&client, &mz.namespace()).await;
609        }
610
611        // and the console relies on the balancer service existing, which is
612        // enforced by balancer::Resources::apply having a check for its pods
613        // being up, and not returning successfully until they are
614
615        if !matches!(result, Ok(None)) {
616            return result.map_err(Error::Anyhow);
617        }
618
619        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
620        else {
621            return Err(Error::Anyhow(anyhow::anyhow!(
622                "failed to parse environmentd image ref: {}",
623                mz.spec.environmentd_image_ref
624            )));
625        };
626        let console_image_tag = self
627            .config
628            .console_image_tag_map
629            .iter()
630            .find(|kv| kv.key == environmentd_image_tag)
631            .map(|kv| kv.value.clone())
632            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
633        let console = console::Resources::new(
634            &self.config,
635            mz,
636            &matching_image_from_environmentd_image_ref(
637                &mz.spec.environmentd_image_ref,
638                "console",
639                Some(&console_image_tag),
640            ),
641        );
642        if self.config.create_console {
643            console.apply(&client, &mz.namespace()).await?;
644        } else {
645            console.cleanup(&client, &mz.namespace()).await?;
646        }
647
648        result.map_err(Error::Anyhow)
649    }
650
651    #[instrument(fields(organization_name=mz.name_unchecked()))]
652    async fn cleanup(
653        &self,
654        _client: Client,
655        mz: &Self::Resource,
656    ) -> Result<Option<Action>, Self::Error> {
657        self.set_needs_update(mz, false);
658
659        Ok(None)
660    }
661}
662
663fn matching_image_from_environmentd_image_ref(
664    environmentd_image_ref: &str,
665    image_name: &str,
666    image_tag: Option<&str>,
667) -> String {
668    let namespace = environmentd_image_ref
669        .rsplit_once('/')
670        .unwrap_or(("materialize", ""))
671        .0;
672    let tag = image_tag.unwrap_or_else(|| {
673        environmentd_image_ref
674            .rsplit_once(':')
675            .unwrap_or(("", "unstable"))
676            .1
677    });
678    format!("{namespace}/{image_name}:{tag}")
679}