mz_orchestratord/controller/
materialize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    collections::BTreeSet,
12    fmt::Display,
13    str::FromStr,
14    sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
19use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
20use serde::Deserialize;
21use tracing::{debug, trace};
22
23use crate::metrics::Metrics;
24use mz_cloud_provider::CloudProvider;
25use mz_cloud_resources::crd::materialize::v1alpha1::{
26    Materialize, MaterializeCertSpec, MaterializeStatus,
27};
28use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
29use mz_orchestrator_tracing::TracingCliArgs;
30use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
31
32pub mod balancer;
33pub mod console;
34pub mod environmentd;
35pub mod tls;
36
37#[derive(clap::Parser)]
38pub struct MaterializeControllerArgs {
39    #[clap(long)]
40    cloud_provider: CloudProvider,
41    #[clap(long)]
42    region: String,
43    #[clap(long)]
44    create_balancers: bool,
45    #[clap(long)]
46    create_console: bool,
47    #[clap(long)]
48    helm_chart_version: Option<String>,
49    #[clap(long, default_value = "kubernetes")]
50    secrets_controller: String,
51    #[clap(long)]
52    collect_pod_metrics: bool,
53    #[clap(long)]
54    enable_prometheus_scrape_annotations: bool,
55    #[clap(long)]
56    disable_authentication: bool,
57
58    #[clap(long)]
59    segment_api_key: Option<String>,
60    #[clap(long)]
61    segment_client_side: bool,
62
63    #[clap(long)]
64    console_image_tag_default: String,
65    #[clap(long)]
66    console_image_tag_map: Vec<KeyValueArg<String, String>>,
67
68    #[clap(flatten)]
69    aws_info: AwsInfo,
70
71    #[clap(long)]
72    ephemeral_volume_class: Option<String>,
73    #[clap(long)]
74    scheduler_name: Option<String>,
75    #[clap(long)]
76    enable_security_context: bool,
77    #[clap(long)]
78    enable_internal_statement_logging: bool,
79    #[clap(long, default_value = "false")]
80    disable_statement_logging: bool,
81
82    #[clap(long)]
83    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
84    #[clap(long)]
85    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
86    #[clap(long)]
87    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
88    #[clap(long)]
89    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
90    #[clap(long)]
91    console_node_selector: Vec<KeyValueArg<String, String>>,
92    #[clap(long, default_value = "always", value_enum)]
93    image_pull_policy: KubernetesImagePullPolicy,
94    #[clap(flatten)]
95    network_policies: NetworkPolicyConfig,
96
97    #[clap(long)]
98    environmentd_cluster_replica_sizes: Option<String>,
99    #[clap(long)]
100    bootstrap_default_cluster_replica_size: Option<String>,
101    #[clap(long)]
102    bootstrap_builtin_system_cluster_replica_size: Option<String>,
103    #[clap(long)]
104    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
105    #[clap(long)]
106    bootstrap_builtin_support_cluster_replica_size: Option<String>,
107    #[clap(long)]
108    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
109    #[clap(long)]
110    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
111    #[clap(long)]
112    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
113    #[clap(long)]
114    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
115    #[clap(long)]
116    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
117    #[clap(long)]
118    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
119
120    #[clap(
121        long,
122        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
123    )]
124    environmentd_allowed_origins: Vec<HeaderValue>,
125    #[clap(long, default_value = "https://console.materialize.com")]
126    internal_console_proxy_url: String,
127
128    #[clap(long, default_value = "6875")]
129    environmentd_sql_port: i32,
130    #[clap(long, default_value = "6876")]
131    environmentd_http_port: i32,
132    #[clap(long, default_value = "6877")]
133    environmentd_internal_sql_port: i32,
134    #[clap(long, default_value = "6878")]
135    environmentd_internal_http_port: i32,
136    #[clap(long)]
137    environmentd_internal_http_host_override: Option<String>,
138    #[clap(long, default_value = "6879")]
139    environmentd_internal_persist_pubsub_port: i32,
140
141    #[clap(long, default_value = "6875")]
142    balancerd_sql_port: i32,
143    #[clap(long, default_value = "6876")]
144    balancerd_http_port: i32,
145    #[clap(long, default_value = "8080")]
146    balancerd_internal_http_port: i32,
147
148    #[clap(long, default_value = "8080")]
149    console_http_port: i32,
150
151    #[clap(long, default_value = "{}")]
152    default_certificate_specs: DefaultCertificateSpecs,
153
154    #[clap(long, hide = true)]
155    disable_license_key_checks: bool,
156}
157
158#[derive(Clone, Deserialize, Default)]
159#[serde(rename_all = "camelCase")]
160pub struct DefaultCertificateSpecs {
161    balancerd_external: Option<MaterializeCertSpec>,
162    console_external: Option<MaterializeCertSpec>,
163    internal: Option<MaterializeCertSpec>,
164}
165
166impl FromStr for DefaultCertificateSpecs {
167    type Err = serde_json::Error;
168
169    fn from_str(s: &str) -> Result<Self, Self::Err> {
170        serde_json::from_str(s)
171    }
172}
173
174#[derive(clap::Parser)]
175pub struct AwsInfo {
176    #[clap(long)]
177    aws_account_id: Option<String>,
178    #[clap(long)]
179    environmentd_iam_role_arn: Option<String>,
180    #[clap(long)]
181    environmentd_connection_role_arn: Option<String>,
182    #[clap(long)]
183    aws_secrets_controller_tags: Vec<String>,
184    #[clap(long)]
185    environmentd_availability_zones: Option<Vec<String>>,
186}
187
188#[derive(clap::Parser)]
189pub struct NetworkPolicyConfig {
190    #[clap(long = "network-policies-internal-enabled")]
191    internal_enabled: bool,
192
193    #[clap(long = "network-policies-ingress-enabled")]
194    ingress_enabled: bool,
195
196    #[clap(long = "network-policies-ingress-cidrs")]
197    ingress_cidrs: Vec<String>,
198
199    #[clap(long = "network-policies-egress-enabled")]
200    egress_enabled: bool,
201
202    #[clap(long = "network-policies-egress-cidrs")]
203    egress_cidrs: Vec<String>,
204}
205
206#[derive(Debug, thiserror::Error)]
207pub enum Error {
208    Anyhow(#[from] anyhow::Error),
209    Kube(#[from] kube::Error),
210}
211
212impl Display for Error {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        match self {
215            Self::Anyhow(e) => write!(f, "{e}"),
216            Self::Kube(e) => write!(f, "{e}"),
217        }
218    }
219}
220
221pub struct Context {
222    config: MaterializeControllerArgs,
223    tracing: TracingCliArgs,
224    orchestratord_namespace: String,
225    metrics: Arc<Metrics>,
226    needs_update: Arc<Mutex<BTreeSet<String>>>,
227}
228
229impl Context {
230    pub fn new(
231        config: MaterializeControllerArgs,
232        tracing: TracingCliArgs,
233        orchestratord_namespace: String,
234        metrics: Arc<Metrics>,
235    ) -> Self {
236        if config.cloud_provider == CloudProvider::Aws {
237            assert!(
238                config.aws_info.aws_account_id.is_some(),
239                "--aws-account-id is required when using --cloud-provider=aws"
240            );
241            assert!(
242                config.aws_info.environmentd_iam_role_arn.is_some(),
243                "--environmentd-iam-role-arn is required when using --cloud-provider=aws"
244            );
245        }
246
247        Self {
248            config,
249            tracing,
250            orchestratord_namespace,
251            metrics,
252            needs_update: Default::default(),
253        }
254    }
255
256    fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
257        let mut needs_update_set = self.needs_update.lock().unwrap();
258        if needs_update {
259            needs_update_set.insert(mz.name_unchecked());
260        } else {
261            needs_update_set.remove(&mz.name_unchecked());
262        }
263        self.metrics
264            .needs_update
265            .set(u64::cast_from(needs_update_set.len()));
266    }
267
268    async fn update_status(
269        &self,
270        mz_api: &Api<Materialize>,
271        mz: &Materialize,
272        status: MaterializeStatus,
273        needs_update: bool,
274    ) -> Result<Materialize, kube::Error> {
275        self.set_needs_update(mz, needs_update);
276
277        let mut new_mz = mz.clone();
278        if !mz
279            .status
280            .as_ref()
281            .map_or(true, |mz_status| mz_status.needs_update(&status))
282        {
283            return Ok(new_mz);
284        }
285
286        new_mz.status = Some(status);
287        mz_api
288            .replace_status(
289                &mz.name_unchecked(),
290                &PostParams::default(),
291                serde_json::to_vec(&new_mz).unwrap(),
292            )
293            .await
294    }
295}
296
297#[async_trait::async_trait]
298impl k8s_controller::Context for Context {
299    type Resource = Materialize;
300    type Error = Error;
301
302    const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
303
304    #[instrument(fields(organization_name=mz.name_unchecked()))]
305    async fn apply(
306        &self,
307        client: Client,
308        mz: &Self::Resource,
309    ) -> Result<Option<Action>, Self::Error> {
310        let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
311
312        let status = mz.status();
313        if mz.status.is_none() {
314            self.update_status(&mz_api, mz, status, true).await?;
315            // Updating the status should trigger a reconciliation
316            // which will include a status this time.
317            return Ok(None);
318        }
319
320        // we compare the hash against the environment resources generated
321        // for the current active generation, since that's what we expect to
322        // have been applied earlier, but we don't want to use these
323        // environment resources because when we apply them, we want to apply
324        // them with data that uses the new generation
325        let active_resources = environmentd::Resources::new(
326            &self.config,
327            &self.tracing,
328            &self.orchestratord_namespace,
329            mz,
330            status.active_generation,
331        );
332        let has_current_changes = status.resources_hash != active_resources.generate_hash();
333        let active_generation = status.active_generation;
334        let next_generation = active_generation + 1;
335        let increment_generation = has_current_changes && !mz.in_place_rollout();
336        let desired_generation = if increment_generation {
337            next_generation
338        } else {
339            active_generation
340        };
341
342        // here we regenerate the environment resources using the
343        // same inputs except with an updated generation
344        let resources = environmentd::Resources::new(
345            &self.config,
346            &self.tracing,
347            &self.orchestratord_namespace,
348            mz,
349            desired_generation,
350        );
351        let resources_hash = resources.generate_hash();
352
353        let mut result = if has_current_changes {
354            if mz.rollout_requested() {
355                // we remove the environment resources hash annotation here
356                // because if we fail halfway through applying the resources,
357                // things will be in an inconsistent state, and we don't want
358                // to allow the possibility of the user making a second
359                // change which reverts to the original state and then
360                // skipping retrying this apply, since that would leave
361                // things in a permanently inconsistent state.
362                // note that environment.spec will be empty here after
363                // replace_status, but this is fine because we already
364                // extracted all of the information we want from the spec
365                // earlier.
366                let mz = self
367                    .update_status(
368                        &mz_api,
369                        mz,
370                        MaterializeStatus {
371                            active_generation,
372                            // don't update the reconciliation id yet,
373                            // because the rollout hasn't yet completed. if
374                            // we fail later on, we want to ensure that the
375                            // rollout gets retried.
376                            last_completed_rollout_request: status.last_completed_rollout_request,
377                            resource_id: status.resource_id,
378                            resources_hash: String::new(),
379                            conditions: vec![Condition {
380                                type_: "UpToDate".into(),
381                                status: "Unknown".into(),
382                                last_transition_time: Time(chrono::offset::Utc::now()),
383                                message: format!(
384                                    "Applying changes for generation {desired_generation}"
385                                ),
386                                observed_generation: mz.meta().generation,
387                                reason: "Applying".into(),
388                            }],
389                        },
390                        active_generation != desired_generation,
391                    )
392                    .await?;
393                let mz = &mz;
394                let status = mz.status();
395
396                trace!("applying environment resources");
397                match resources
398                    .apply(
399                        &client,
400                        &self.config,
401                        increment_generation,
402                        mz.should_force_promote(),
403                        &mz.namespace(),
404                    )
405                    .await
406                {
407                    Ok(Some(action)) => {
408                        trace!("new environment is not yet ready");
409                        Ok(Some(action))
410                    }
411                    Ok(None) => {
412                        // do this last, so that we keep traffic pointing at
413                        // the previous environmentd until the new one is
414                        // fully ready
415                        resources.promote_services(&client, &mz.namespace()).await?;
416                        if increment_generation {
417                            resources
418                                .teardown_generation(&client, mz, active_generation)
419                                .await?;
420                        }
421                        self.update_status(
422                            &mz_api,
423                            mz,
424                            MaterializeStatus {
425                                active_generation: desired_generation,
426                                last_completed_rollout_request: mz.requested_reconciliation_id(),
427                                resource_id: status.resource_id,
428                                resources_hash,
429                                conditions: vec![Condition {
430                                    type_: "UpToDate".into(),
431                                    status: "True".into(),
432                                    last_transition_time: Time(chrono::offset::Utc::now()),
433                                    message: format!(
434                                        "Successfully applied changes for generation {desired_generation}"
435                                    ),
436                                    observed_generation: mz.meta().generation,
437                                    reason: "Applied".into(),
438                                }],
439                            },
440                            false,
441                        )
442                        .await?;
443                        Ok(None)
444                    }
445                    Err(e) => {
446                        resources
447                            .teardown_generation(&client, mz, next_generation)
448                            .await?;
449                        self.update_status(
450                            &mz_api,
451                            mz,
452                            MaterializeStatus {
453                                active_generation,
454                                // also don't update the reconciliation id
455                                // here, because there was an error during
456                                // the rollout and we want to ensure it gets
457                                // retried.
458                                last_completed_rollout_request: status.last_completed_rollout_request,
459                                resource_id: status.resource_id,
460                                resources_hash: status.resources_hash,
461                                conditions: vec![Condition {
462                                    type_: "UpToDate".into(),
463                                    status: "False".into(),
464                                    last_transition_time: Time(chrono::offset::Utc::now()),
465                                    message: format!(
466                                        "Failed to apply changes for generation {desired_generation}: {e}"
467                                    ),
468                                    observed_generation: mz.meta().generation,
469                                    reason: "FailedDeploy".into(),
470                                }],
471                            },
472                            active_generation != desired_generation,
473                        )
474                        .await?;
475                        Err(e)
476                    }
477                }
478            } else {
479                let mut needs_update = mz.conditions_need_update();
480                if mz.update_in_progress() {
481                    resources
482                        .teardown_generation(&client, mz, next_generation)
483                        .await?;
484                    needs_update = true;
485                }
486                if needs_update {
487                    self.update_status(
488                        &mz_api,
489                        mz,
490                        MaterializeStatus {
491                            active_generation,
492                            last_completed_rollout_request: mz.requested_reconciliation_id(),
493                            resource_id: status.resource_id,
494                            resources_hash: status.resources_hash,
495                            conditions: vec![Condition {
496                                type_: "UpToDate".into(),
497                                status: "False".into(),
498                                last_transition_time: Time(chrono::offset::Utc::now()),
499                                message: format!(
500                                    "Changes detected, waiting for approval for generation {desired_generation}"
501                                ),
502                                observed_generation: mz.meta().generation,
503                                reason: "WaitingForApproval".into(),
504                            }],
505                        },
506                        active_generation != desired_generation,
507                    )
508                    .await?;
509                }
510                debug!("changes detected, waiting for approval");
511                Ok(None)
512            }
513        } else {
514            // this can happen if we update the environment, but then revert
515            // that update before the update was deployed. in this case, we
516            // don't want the environment to still show up as
517            // WaitingForApproval.
518            let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
519            if mz.update_in_progress() {
520                resources
521                    .teardown_generation(&client, mz, next_generation)
522                    .await?;
523                needs_update = true;
524            }
525            if needs_update {
526                self.update_status(
527                    &mz_api,
528                    mz,
529                    MaterializeStatus {
530                        active_generation,
531                        last_completed_rollout_request: mz.requested_reconciliation_id(),
532                        resource_id: status.resource_id,
533                        resources_hash: status.resources_hash,
534                        conditions: vec![Condition {
535                            type_: "UpToDate".into(),
536                            status: "True".into(),
537                            last_transition_time: Time(chrono::offset::Utc::now()),
538                            message: format!(
539                                "No changes found from generation {active_generation}"
540                            ),
541                            observed_generation: mz.meta().generation,
542                            reason: "Applied".into(),
543                        }],
544                    },
545                    active_generation != desired_generation,
546                )
547                .await?;
548            }
549            debug!("no changes");
550            Ok(None)
551        };
552
553        // balancers rely on the environmentd service existing, which is
554        // enforced by the environmentd rollout process being able to call
555        // into the promotion endpoint
556
557        if !matches!(result, Ok(None)) {
558            return result.map_err(Error::Anyhow);
559        }
560
561        if self.config.create_balancers {
562            result = balancer::Resources::new(&self.config, mz)
563                .apply(&client, &mz.namespace())
564                .await;
565        }
566
567        // and the console relies on the balancer service existing, which is
568        // enforced by balancer::Resources::apply having a check for its pods
569        // being up, and not returning successfully until they are
570
571        if !matches!(result, Ok(None)) {
572            return result.map_err(Error::Anyhow);
573        }
574
575        if self.config.create_console {
576            let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
577            else {
578                return Err(Error::Anyhow(anyhow::anyhow!(
579                    "failed to parse environmentd image ref: {}",
580                    mz.spec.environmentd_image_ref
581                )));
582            };
583            let console_image_tag = self
584                .config
585                .console_image_tag_map
586                .iter()
587                .find(|kv| kv.key == environmentd_image_tag)
588                .map(|kv| kv.value.clone())
589                .unwrap_or_else(|| self.config.console_image_tag_default.clone());
590            console::Resources::new(
591                &self.config,
592                mz,
593                &matching_image_from_environmentd_image_ref(
594                    &mz.spec.environmentd_image_ref,
595                    "console",
596                    Some(&console_image_tag),
597                ),
598            )
599            .apply(&client, &mz.namespace())
600            .await?;
601        }
602
603        result.map_err(Error::Anyhow)
604    }
605
606    #[instrument(fields(organization_name=mz.name_unchecked()))]
607    async fn cleanup(
608        &self,
609        _client: Client,
610        mz: &Self::Resource,
611    ) -> Result<Option<Action>, Self::Error> {
612        self.set_needs_update(mz, false);
613
614        Ok(None)
615    }
616}
617
618fn matching_image_from_environmentd_image_ref(
619    environmentd_image_ref: &str,
620    image_name: &str,
621    image_tag: Option<&str>,
622) -> String {
623    let namespace = environmentd_image_ref
624        .rsplit_once('/')
625        .unwrap_or(("materialize", ""))
626        .0;
627    let tag = image_tag.unwrap_or_else(|| {
628        environmentd_image_ref
629            .rsplit_once(':')
630            .unwrap_or(("", "unstable"))
631            .1
632    });
633    format!("{namespace}/{image_name}:{tag}")
634}