mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    str::FromStr,
14    sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::{
19    api::core::v1::{Affinity, ResourceRequirements, Toleration},
20    apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
23use serde::Deserialize;
24use tracing::{debug, trace};
25use uuid::Uuid;
26
27use crate::metrics::Metrics;
28use mz_cloud_provider::CloudProvider;
29use mz_cloud_resources::crd::materialize::v1alpha1::{
30    Materialize, MaterializeCertSpec, MaterializeStatus,
31};
32use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
33use mz_orchestrator_tracing::TracingCliArgs;
34use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
35
36pub mod balancer;
37pub mod console;
38pub mod environmentd;
39pub mod tls;
40
41#[derive(clap::Parser)]
42pub struct MaterializeControllerArgs {
43    #[clap(long)]
44    cloud_provider: CloudProvider,
45    #[clap(long)]
46    region: String,
47    #[clap(long)]
48    create_balancers: bool,
49    #[clap(long)]
50    create_console: bool,
51    #[clap(long)]
52    helm_chart_version: Option<String>,
53    #[clap(long, default_value = "kubernetes")]
54    secrets_controller: String,
55    #[clap(long)]
56    collect_pod_metrics: bool,
57    #[clap(long)]
58    enable_prometheus_scrape_annotations: bool,
59    #[clap(long)]
60    disable_authentication: bool,
61
62    #[clap(long)]
63    segment_api_key: Option<String>,
64    #[clap(long)]
65    segment_client_side: bool,
66
67    #[clap(long)]
68    console_image_tag_default: String,
69    #[clap(long)]
70    console_image_tag_map: Vec<KeyValueArg<String, String>>,
71
72    #[clap(flatten)]
73    aws_info: AwsInfo,
74
75    #[clap(long)]
76    ephemeral_volume_class: Option<String>,
77    #[clap(long)]
78    scheduler_name: Option<String>,
79    #[clap(long)]
80    enable_security_context: bool,
81    #[clap(long)]
82    enable_internal_statement_logging: bool,
83    #[clap(long, default_value = "false")]
84    disable_statement_logging: bool,
85
86    #[clap(long)]
87    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
88    #[clap(long)]
89    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
90    #[clap(long, value_parser = parse_affinity)]
91    environmentd_affinity: Option<Affinity>,
92    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
93    environmentd_tolerations: Option<Vec<Toleration>>,
94    #[clap(long, value_parser = parse_resources)]
95    environmentd_default_resources: Option<ResourceRequirements>,
96    #[clap(long)]
97    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
98    #[clap(long, value_parser = parse_affinity)]
99    clusterd_affinity: Option<Affinity>,
100    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
101    clusterd_tolerations: Option<Vec<Toleration>>,
102    #[clap(long)]
103    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
104    #[clap(long, value_parser = parse_affinity)]
105    balancerd_affinity: Option<Affinity>,
106    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
107    balancerd_tolerations: Option<Vec<Toleration>>,
108    #[clap(long, value_parser = parse_resources)]
109    balancerd_default_resources: Option<ResourceRequirements>,
110    #[clap(long)]
111    console_node_selector: Vec<KeyValueArg<String, String>>,
112    #[clap(long, value_parser = parse_affinity)]
113    console_affinity: Option<Affinity>,
114    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
115    console_tolerations: Option<Vec<Toleration>>,
116    #[clap(long, value_parser = parse_resources)]
117    console_default_resources: Option<ResourceRequirements>,
118    #[clap(long, default_value = "always", value_enum)]
119    image_pull_policy: KubernetesImagePullPolicy,
120    #[clap(flatten)]
121    network_policies: NetworkPolicyConfig,
122
123    #[clap(long)]
124    environmentd_cluster_replica_sizes: Option<String>,
125    #[clap(long)]
126    bootstrap_default_cluster_replica_size: Option<String>,
127    #[clap(long)]
128    bootstrap_builtin_system_cluster_replica_size: Option<String>,
129    #[clap(long)]
130    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
131    #[clap(long)]
132    bootstrap_builtin_support_cluster_replica_size: Option<String>,
133    #[clap(long)]
134    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
135    #[clap(long)]
136    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
137    #[clap(long)]
138    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
139    #[clap(long)]
140    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
141    #[clap(long)]
142    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
143    #[clap(long)]
144    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
145
146    #[clap(
147        long,
148        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
149    )]
150    environmentd_allowed_origins: Vec<HeaderValue>,
151    #[clap(long, default_value = "https://console.materialize.com")]
152    internal_console_proxy_url: String,
153
154    #[clap(long, default_value = "6875")]
155    environmentd_sql_port: u16,
156    #[clap(long, default_value = "6876")]
157    environmentd_http_port: u16,
158    #[clap(long, default_value = "6877")]
159    environmentd_internal_sql_port: u16,
160    #[clap(long, default_value = "6878")]
161    environmentd_internal_http_port: u16,
162    #[clap(long, default_value = "6879")]
163    environmentd_internal_persist_pubsub_port: u16,
164
165    #[clap(long, default_value = "6875")]
166    balancerd_sql_port: u16,
167    #[clap(long, default_value = "6876")]
168    balancerd_http_port: u16,
169    #[clap(long, default_value = "8080")]
170    balancerd_internal_http_port: u16,
171
172    #[clap(long, default_value = "8080")]
173    console_http_port: u16,
174
175    #[clap(long, default_value = "{}")]
176    default_certificate_specs: DefaultCertificateSpecs,
177
178    #[clap(long, hide = true)]
179    disable_license_key_checks: bool,
180}
181
182fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
183    Ok(serde_json::from_str(s)?)
184}
185
186fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
187    Ok(serde_json::from_str(s)?)
188}
189
190fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
191    Ok(serde_json::from_str(s)?)
192}
193
194#[derive(Clone, Deserialize, Default)]
195#[serde(rename_all = "camelCase")]
196pub struct DefaultCertificateSpecs {
197    balancerd_external: Option<MaterializeCertSpec>,
198    console_external: Option<MaterializeCertSpec>,
199    internal: Option<MaterializeCertSpec>,
200}
201
202impl FromStr for DefaultCertificateSpecs {
203    type Err = serde_json::Error;
204
205    fn from_str(s: &str) -> Result<Self, Self::Err> {
206        serde_json::from_str(s)
207    }
208}
209
210#[derive(clap::Parser)]
211pub struct AwsInfo {
212    #[clap(long)]
213    aws_account_id: Option<String>,
214    #[clap(long)]
215    environmentd_iam_role_arn: Option<String>,
216    #[clap(long)]
217    environmentd_connection_role_arn: Option<String>,
218    #[clap(long)]
219    aws_secrets_controller_tags: Vec<String>,
220    #[clap(long)]
221    environmentd_availability_zones: Option<Vec<String>>,
222}
223
224#[derive(clap::Parser)]
225pub struct NetworkPolicyConfig {
226    #[clap(long = "network-policies-internal-enabled")]
227    internal_enabled: bool,
228
229    #[clap(long = "network-policies-ingress-enabled")]
230    ingress_enabled: bool,
231
232    #[clap(long = "network-policies-ingress-cidrs")]
233    ingress_cidrs: Vec<String>,
234
235    #[clap(long = "network-policies-egress-enabled")]
236    egress_enabled: bool,
237
238    #[clap(long = "network-policies-egress-cidrs")]
239    egress_cidrs: Vec<String>,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum Error {
244    Anyhow(#[from] anyhow::Error),
245    Kube(#[from] kube::Error),
246}
247
248impl Display for Error {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        match self {
251            Self::Anyhow(e) => write!(f, "{e}"),
252            Self::Kube(e) => write!(f, "{e}"),
253        }
254    }
255}
256
257pub struct Context {
258    config: MaterializeControllerArgs,
259    tracing: TracingCliArgs,
260    orchestratord_namespace: String,
261    metrics: Arc<Metrics>,
262    needs_update: Arc<Mutex<BTreeSet<String>>>,
263}
264
265impl Context {
266    pub fn new(
267        config: MaterializeControllerArgs,
268        tracing: TracingCliArgs,
269        orchestratord_namespace: String,
270        metrics: Arc<Metrics>,
271    ) -> Self {
272        if config.cloud_provider == CloudProvider::Aws {
273            assert!(
274                config.aws_info.aws_account_id.is_some(),
275                "--aws-account-id is required when using --cloud-provider=aws"
276            );
277        }
278
279        Self {
280            config,
281            tracing,
282            orchestratord_namespace,
283            metrics,
284            needs_update: Default::default(),
285        }
286    }
287
288    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
289        let mut needs_update_set = self.needs_update.lock().unwrap();
290        if needs_update {
291            needs_update_set.insert(mz.name_unchecked());
292        } else {
293            needs_update_set.remove(&mz.name_unchecked());
294        }
295        self.metrics
296            .environmentd_needs_update
297            .set(u64::cast_from(needs_update_set.len()));
298    }
299
300    async fn update_status(
301        &self,
302        mz_api: &Api<Materialize>,
303        mz: &Materialize,
304        status: MaterializeStatus,
305        needs_update: bool,
306    ) -> Result<Materialize, kube::Error> {
307        self.set_needs_update(mz, needs_update);
308
309        let mut new_mz = mz.clone();
310        if !mz
311            .status
312            .as_ref()
313            .map_or(true, |mz_status| mz_status.needs_update(&status))
314        {
315            return Ok(new_mz);
316        }
317
318        new_mz.status = Some(status);
319        mz_api
320            .replace_status(
321                &mz.name_unchecked(),
322                &PostParams::default(),
323                serde_json::to_vec(&new_mz).unwrap(),
324            )
325            .await
326    }
327}
328
329#[async_trait::async_trait]
330impl k8s_controller::Context for Context {
331    type Resource = Materialize;
332    type Error = Error;
333
334    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
335
336    #[instrument(fields(organization_name=mz.name_unchecked()))]
337    async fn apply(
338        &self,
339        client: Client,
340        mz: &Self::Resource,
341    ) -> Result<Option<Action>, Self::Error> {
342        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
343
344        let status = mz.status();
345        if mz.status.is_none() {
346            self.update_status(&mz_api, mz, status, true).await?;
347            // Updating the status should trigger a reconciliation
348            // which will include a status this time.
349            return Ok(None);
350        }
351
352        if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
353            let mut mz = mz.clone();
354            if mz.spec.request_rollout.is_nil() {
355                mz.spec.request_rollout = Uuid::new_v4();
356            }
357            if mz.spec.environment_id.is_nil() {
358                mz.spec.environment_id = Uuid::new_v4();
359            }
360            mz_api
361                .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
362                .await?;
363            // Updating the spec should also trigger a reconciliation.
364            // We can't do that as part of the above check because you can't
365            // update both the spec and the status in a single api call.
366            return Ok(None);
367        }
368
369        // we compare the hash against the environment resources generated
370        // for the current active generation, since that's what we expect to
371        // have been applied earlier, but we don't want to use these
372        // environment resources because when we apply them, we want to apply
373        // them with data that uses the new generation
374        let active_resources = environmentd::Resources::new(
375            &self.config,
376            &self.tracing,
377            &self.orchestratord_namespace,
378            mz,
379            status.active_generation,
380        );
381        let has_current_changes = status.resources_hash != active_resources.generate_hash();
382        let active_generation = status.active_generation;
383        let next_generation = active_generation + 1;
384        let increment_generation = has_current_changes && !mz.in_place_rollout();
385        let desired_generation = if increment_generation {
386            next_generation
387        } else {
388            active_generation
389        };
390
391        // here we regenerate the environment resources using the
392        // same inputs except with an updated generation
393        let resources = environmentd::Resources::new(
394            &self.config,
395            &self.tracing,
396            &self.orchestratord_namespace,
397            mz,
398            desired_generation,
399        );
400        let resources_hash = resources.generate_hash();
401
402        let mut result = if has_current_changes {
403            if mz.rollout_requested() {
404                // we remove the environment resources hash annotation here
405                // because if we fail halfway through applying the resources,
406                // things will be in an inconsistent state, and we don't want
407                // to allow the possibility of the user making a second
408                // change which reverts to the original state and then
409                // skipping retrying this apply, since that would leave
410                // things in a permanently inconsistent state.
411                // note that environment.spec will be empty here after
412                // replace_status, but this is fine because we already
413                // extracted all of the information we want from the spec
414                // earlier.
415                let mz = self
416                    .update_status(
417                        &mz_api,
418                        mz,
419                        MaterializeStatus {
420                            active_generation,
421                            // don't update the reconciliation id yet,
422                            // because the rollout hasn't yet completed. if
423                            // we fail later on, we want to ensure that the
424                            // rollout gets retried.
425                            last_completed_rollout_request: status.last_completed_rollout_request,
426                            resource_id: status.resource_id,
427                            resources_hash: String::new(),
428                            conditions: vec![Condition {
429                                type_: "UpToDate".into(),
430                                status: "Unknown".into(),
431                                last_transition_time: Time(chrono::offset::Utc::now()),
432                                message: format!(
433                                    "Applying changes for generation {desired_generation}"
434                                ),
435                                observed_generation: mz.meta().generation,
436                                reason: "Applying".into(),
437                            }],
438                        },
439                        active_generation != desired_generation,
440                    )
441                    .await?;
442                let mz = &mz;
443                let status = mz.status();
444
445                trace!("applying environment resources");
446                match resources
447                    .apply(
448                        &client,
449                        increment_generation,
450                        mz.should_force_promote(),
451                        &mz.namespace(),
452                    )
453                    .await
454                {
455                    Ok(Some(action)) => {
456                        trace!("new environment is not yet ready");
457                        Ok(Some(action))
458                    }
459                    Ok(None) => {
460                        // do this last, so that we keep traffic pointing at
461                        // the previous environmentd until the new one is
462                        // fully ready
463                        resources.promote_services(&client, &mz.namespace()).await?;
464                        if increment_generation {
465                            resources
466                                .teardown_generation(&client, mz, active_generation)
467                                .await?;
468                        }
469                        self.update_status(
470                            &mz_api,
471                            mz,
472                            MaterializeStatus {
473                                active_generation: desired_generation,
474                                last_completed_rollout_request: mz.requested_reconciliation_id(),
475                                resource_id: status.resource_id,
476                                resources_hash,
477                                conditions: vec![Condition {
478                                    type_: "UpToDate".into(),
479                                    status: "True".into(),
480                                    last_transition_time: Time(chrono::offset::Utc::now()),
481                                    message: format!(
482                                        "Successfully applied changes for generation {desired_generation}"
483                                    ),
484                                    observed_generation: mz.meta().generation,
485                                    reason: "Applied".into(),
486                                }],
487                            },
488                            false,
489                        )
490                        .await?;
491                        Ok(None)
492                    }
493                    Err(e) => {
494                        // TODO should we actually tear things down here?
495                        // I don't think we should. This might have some weird behavior if we hit
496                        // transient errors. We might end up tearing down many hours of progress on
497                        // rehydration due to a transient failure to reapply an already working K8S
498                        // object.
499                        resources
500                            .teardown_generation(&client, mz, next_generation)
501                            .await?;
502                        self.update_status(
503                            &mz_api,
504                            mz,
505                            MaterializeStatus {
506                                active_generation,
507                                // also don't update the reconciliation id
508                                // here, because there was an error during
509                                // the rollout and we want to ensure it gets
510                                // retried.
511                                last_completed_rollout_request: status.last_completed_rollout_request,
512                                resource_id: status.resource_id,
513                                resources_hash: status.resources_hash,
514                                conditions: vec![Condition {
515                                    type_: "UpToDate".into(),
516                                    status: "False".into(),
517                                    last_transition_time: Time(chrono::offset::Utc::now()),
518                                    message: format!(
519                                        "Failed to apply changes for generation {desired_generation}: {e}"
520                                    ),
521                                    observed_generation: mz.meta().generation,
522                                    reason: "FailedDeploy".into(),
523                                }],
524                            },
525                            active_generation != desired_generation,
526                        )
527                        .await?;
528                        Err(e)
529                    }
530                }
531            } else {
532                let mut needs_update = mz.conditions_need_update();
533                if mz.update_in_progress() {
534                    resources
535                        .teardown_generation(&client, mz, next_generation)
536                        .await?;
537                    needs_update = true;
538                }
539                if needs_update {
540                    self.update_status(
541                        &mz_api,
542                        mz,
543                        MaterializeStatus {
544                            active_generation,
545                            last_completed_rollout_request: mz.requested_reconciliation_id(),
546                            resource_id: status.resource_id,
547                            resources_hash: status.resources_hash,
548                            conditions: vec![Condition {
549                                type_: "UpToDate".into(),
550                                status: "False".into(),
551                                last_transition_time: Time(chrono::offset::Utc::now()),
552                                message: format!(
553                                    "Changes detected, waiting for approval for generation {desired_generation}"
554                                ),
555                                observed_generation: mz.meta().generation,
556                                reason: "WaitingForApproval".into(),
557                            }],
558                        },
559                        active_generation != desired_generation,
560                    )
561                    .await?;
562                }
563                debug!("changes detected, waiting for approval");
564                Ok(None)
565            }
566        } else {
567            // this can happen if we update the environment, but then revert
568            // that update before the update was deployed. in this case, we
569            // don't want the environment to still show up as
570            // WaitingForApproval.
571            let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
572            if mz.update_in_progress() {
573                resources
574                    .teardown_generation(&client, mz, next_generation)
575                    .await?;
576                needs_update = true;
577            }
578            if needs_update {
579                self.update_status(
580                    &mz_api,
581                    mz,
582                    MaterializeStatus {
583                        active_generation,
584                        last_completed_rollout_request: mz.requested_reconciliation_id(),
585                        resource_id: status.resource_id,
586                        resources_hash: status.resources_hash,
587                        conditions: vec![Condition {
588                            type_: "UpToDate".into(),
589                            status: "True".into(),
590                            last_transition_time: Time(chrono::offset::Utc::now()),
591                            message: format!(
592                                "No changes found from generation {active_generation}"
593                            ),
594                            observed_generation: mz.meta().generation,
595                            reason: "Applied".into(),
596                        }],
597                    },
598                    active_generation != desired_generation,
599                )
600                .await?;
601            }
602            debug!("no changes");
603            Ok(None)
604        };
605
606        // balancers rely on the environmentd service existing, which is
607        // enforced by the environmentd rollout process being able to call
608        // into the promotion endpoint
609
610        if !matches!(result, Ok(None)) {
611            return result.map_err(Error::Anyhow);
612        }
613
614        let balancer = balancer::Resources::new(&self.config, mz);
615        if self.config.create_balancers {
616            result = balancer.apply(&client, &mz.namespace()).await;
617        } else {
618            result = balancer.cleanup(&client, &mz.namespace()).await;
619        }
620
621        // and the console relies on the balancer service existing, which is
622        // enforced by balancer::Resources::apply having a check for its pods
623        // being up, and not returning successfully until they are
624
625        if !matches!(result, Ok(None)) {
626            return result.map_err(Error::Anyhow);
627        }
628
629        let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
630        else {
631            return Err(Error::Anyhow(anyhow::anyhow!(
632                "failed to parse environmentd image ref: {}",
633                mz.spec.environmentd_image_ref
634            )));
635        };
636        let console_image_tag = self
637            .config
638            .console_image_tag_map
639            .iter()
640            .find(|kv| kv.key == environmentd_image_tag)
641            .map(|kv| kv.value.clone())
642            .unwrap_or_else(|| self.config.console_image_tag_default.clone());
643        let console = console::Resources::new(
644            &self.config,
645            mz,
646            &matching_image_from_environmentd_image_ref(
647                &mz.spec.environmentd_image_ref,
648                "console",
649                Some(&console_image_tag),
650            ),
651        );
652        if self.config.create_console {
653            console.apply(&client, &mz.namespace()).await?;
654        } else {
655            console.cleanup(&client, &mz.namespace()).await?;
656        }
657
658        result.map_err(Error::Anyhow)
659    }
660
661    #[instrument(fields(organization_name=mz.name_unchecked()))]
662    async fn cleanup(
663        &self,
664        _client: Client,
665        mz: &Self::Resource,
666    ) -> Result<Option<Action>, Self::Error> {
667        self.set_needs_update(mz, false);
668
669        Ok(None)
670    }
671}
672
673fn matching_image_from_environmentd_image_ref(
674    environmentd_image_ref: &str,
675    image_name: &str,
676    image_tag: Option<&str>,
677) -> String {
678    let namespace = environmentd_image_ref
679        .rsplit_once('/')
680        .unwrap_or(("materialize", ""))
681        .0;
682    let tag = image_tag.unwrap_or_else(|| {
683        environmentd_image_ref
684            .rsplit_once(':')
685            .unwrap_or(("", "unstable"))
686            .1
687    });
688    format!("{namespace}/{image_name}:{tag}")
689}