1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 str::FromStr,
14 sync::{Arc, Mutex},
15};
16
17use anyhow::Context as _;
18use http::HeaderValue;
19use k8s_openapi::{
20 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21 apimachinery::pkg::apis::meta::v1::{Condition, Time},
22};
23use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
24use serde::Deserialize;
25use tracing::{debug, trace};
26use uuid::Uuid;
27
28use crate::metrics::Metrics;
29use mz_cloud_provider::CloudProvider;
30use mz_cloud_resources::crd::materialize::v1alpha1::{
31 Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
32};
33use mz_license_keys::validate;
34use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
35use mz_orchestrator_tracing::TracingCliArgs;
36use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
37
38pub mod balancer;
39pub mod console;
40pub mod environmentd;
41pub mod tls;
42
43#[derive(clap::Parser)]
44pub struct MaterializeControllerArgs {
45 #[clap(long)]
46 cloud_provider: CloudProvider,
47 #[clap(long)]
48 region: String,
49 #[clap(long)]
50 create_balancers: bool,
51 #[clap(long)]
52 create_console: bool,
53 #[clap(long)]
54 helm_chart_version: Option<String>,
55 #[clap(long, default_value = "kubernetes")]
56 secrets_controller: String,
57 #[clap(long)]
58 collect_pod_metrics: bool,
59 #[clap(long)]
60 enable_prometheus_scrape_annotations: bool,
61 #[clap(long)]
62 disable_authentication: bool,
63
64 #[clap(long)]
65 segment_api_key: Option<String>,
66 #[clap(long)]
67 segment_client_side: bool,
68
69 #[clap(long)]
70 console_image_tag_default: String,
71 #[clap(long)]
72 console_image_tag_map: Vec<KeyValueArg<String, String>>,
73
74 #[clap(flatten)]
75 aws_info: AwsInfo,
76
77 #[clap(long)]
78 ephemeral_volume_class: Option<String>,
79 #[clap(long)]
80 scheduler_name: Option<String>,
81 #[clap(long)]
82 enable_security_context: bool,
83 #[clap(long)]
84 enable_internal_statement_logging: bool,
85 #[clap(long, default_value = "false")]
86 disable_statement_logging: bool,
87
88 #[clap(long)]
89 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
90 #[clap(long)]
91 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
92 #[clap(long, value_parser = parse_affinity)]
93 environmentd_affinity: Option<Affinity>,
94 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
95 environmentd_tolerations: Option<Vec<Toleration>>,
96 #[clap(long, value_parser = parse_resources)]
97 environmentd_default_resources: Option<ResourceRequirements>,
98 #[clap(long)]
99 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
100 #[clap(long, value_parser = parse_affinity)]
101 clusterd_affinity: Option<Affinity>,
102 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
103 clusterd_tolerations: Option<Vec<Toleration>>,
104 #[clap(long)]
105 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
106 #[clap(long, value_parser = parse_affinity)]
107 balancerd_affinity: Option<Affinity>,
108 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
109 balancerd_tolerations: Option<Vec<Toleration>>,
110 #[clap(long, value_parser = parse_resources)]
111 balancerd_default_resources: Option<ResourceRequirements>,
112 #[clap(long)]
113 console_node_selector: Vec<KeyValueArg<String, String>>,
114 #[clap(long, value_parser = parse_affinity)]
115 console_affinity: Option<Affinity>,
116 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
117 console_tolerations: Option<Vec<Toleration>>,
118 #[clap(long, value_parser = parse_resources)]
119 console_default_resources: Option<ResourceRequirements>,
120 #[clap(long, default_value = "always", value_enum)]
121 image_pull_policy: KubernetesImagePullPolicy,
122 #[clap(flatten)]
123 network_policies: NetworkPolicyConfig,
124
125 #[clap(long)]
126 environmentd_cluster_replica_sizes: Option<String>,
127 #[clap(long)]
128 bootstrap_default_cluster_replica_size: Option<String>,
129 #[clap(long)]
130 bootstrap_builtin_system_cluster_replica_size: Option<String>,
131 #[clap(long)]
132 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
133 #[clap(long)]
134 bootstrap_builtin_support_cluster_replica_size: Option<String>,
135 #[clap(long)]
136 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
137 #[clap(long)]
138 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
139 #[clap(long)]
140 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
141 #[clap(long)]
142 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
143 #[clap(long)]
144 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
145 #[clap(long)]
146 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
147
148 #[clap(
149 long,
150 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
151 )]
152 environmentd_allowed_origins: Vec<HeaderValue>,
153 #[clap(long, default_value = "https://console.materialize.com")]
154 internal_console_proxy_url: String,
155
156 #[clap(long, default_value = "6875")]
157 environmentd_sql_port: u16,
158 #[clap(long, default_value = "6876")]
159 environmentd_http_port: u16,
160 #[clap(long, default_value = "6877")]
161 environmentd_internal_sql_port: u16,
162 #[clap(long, default_value = "6878")]
163 environmentd_internal_http_port: u16,
164 #[clap(long, default_value = "6879")]
165 environmentd_internal_persist_pubsub_port: u16,
166
167 #[clap(long, default_value = "6875")]
168 balancerd_sql_port: u16,
169 #[clap(long, default_value = "6876")]
170 balancerd_http_port: u16,
171 #[clap(long, default_value = "8080")]
172 balancerd_internal_http_port: u16,
173
174 #[clap(long, default_value = "8080")]
175 console_http_port: u16,
176
177 #[clap(long, default_value = "{}")]
178 default_certificate_specs: DefaultCertificateSpecs,
179
180 #[clap(long, hide = true)]
181 disable_license_key_checks: bool,
182}
183
184fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
185 Ok(serde_json::from_str(s)?)
186}
187
188fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
189 Ok(serde_json::from_str(s)?)
190}
191
192fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
193 Ok(serde_json::from_str(s)?)
194}
195
196#[derive(Clone, Deserialize, Default)]
197#[serde(rename_all = "camelCase")]
198pub struct DefaultCertificateSpecs {
199 balancerd_external: Option<MaterializeCertSpec>,
200 console_external: Option<MaterializeCertSpec>,
201 internal: Option<MaterializeCertSpec>,
202}
203
204impl FromStr for DefaultCertificateSpecs {
205 type Err = serde_json::Error;
206
207 fn from_str(s: &str) -> Result<Self, Self::Err> {
208 serde_json::from_str(s)
209 }
210}
211
212#[derive(clap::Parser)]
213pub struct AwsInfo {
214 #[clap(long)]
215 aws_account_id: Option<String>,
216 #[clap(long)]
217 environmentd_iam_role_arn: Option<String>,
218 #[clap(long)]
219 environmentd_connection_role_arn: Option<String>,
220 #[clap(long)]
221 aws_secrets_controller_tags: Vec<String>,
222 #[clap(long)]
223 environmentd_availability_zones: Option<Vec<String>>,
224}
225
226#[derive(clap::Parser)]
227pub struct NetworkPolicyConfig {
228 #[clap(long = "network-policies-internal-enabled")]
229 internal_enabled: bool,
230
231 #[clap(long = "network-policies-ingress-enabled")]
232 ingress_enabled: bool,
233
234 #[clap(long = "network-policies-ingress-cidrs")]
235 ingress_cidrs: Vec<String>,
236
237 #[clap(long = "network-policies-egress-enabled")]
238 egress_enabled: bool,
239
240 #[clap(long = "network-policies-egress-cidrs")]
241 egress_cidrs: Vec<String>,
242}
243
244#[derive(Debug, thiserror::Error)]
245pub enum Error {
246 Anyhow(#[from] anyhow::Error),
247 Kube(#[from] kube::Error),
248}
249
250impl Display for Error {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 match self {
253 Self::Anyhow(e) => write!(f, "{e}"),
254 Self::Kube(e) => write!(f, "{e}"),
255 }
256 }
257}
258
259pub struct Context {
260 config: MaterializeControllerArgs,
261 tracing: TracingCliArgs,
262 orchestratord_namespace: String,
263 metrics: Arc<Metrics>,
264 needs_update: Arc<Mutex<BTreeSet<String>>>,
265}
266
267impl Context {
268 pub fn new(
269 config: MaterializeControllerArgs,
270 tracing: TracingCliArgs,
271 orchestratord_namespace: String,
272 metrics: Arc<Metrics>,
273 ) -> Self {
274 if config.cloud_provider == CloudProvider::Aws {
275 assert!(
276 config.aws_info.aws_account_id.is_some(),
277 "--aws-account-id is required when using --cloud-provider=aws"
278 );
279 }
280
281 Self {
282 config,
283 tracing,
284 orchestratord_namespace,
285 metrics,
286 needs_update: Default::default(),
287 }
288 }
289
290 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
291 let mut needs_update_set = self.needs_update.lock().unwrap();
292 if needs_update {
293 needs_update_set.insert(mz.name_unchecked());
294 } else {
295 needs_update_set.remove(&mz.name_unchecked());
296 }
297 self.metrics
298 .environmentd_needs_update
299 .set(u64::cast_from(needs_update_set.len()));
300 }
301
302 async fn update_status(
303 &self,
304 mz_api: &Api<Materialize>,
305 mz: &Materialize,
306 status: MaterializeStatus,
307 needs_update: bool,
308 ) -> Result<Materialize, kube::Error> {
309 self.set_needs_update(mz, needs_update);
310
311 let mut new_mz = mz.clone();
312 if !mz
313 .status
314 .as_ref()
315 .map_or(true, |mz_status| mz_status.needs_update(&status))
316 {
317 return Ok(new_mz);
318 }
319
320 new_mz.status = Some(status);
321 mz_api
322 .replace_status(
323 &mz.name_unchecked(),
324 &PostParams::default(),
325 serde_json::to_vec(&new_mz).unwrap(),
326 )
327 .await
328 }
329
330 async fn promote(
331 &self,
332 client: &Client,
333 mz: &Materialize,
334 resources: environmentd::Resources,
335 active_generation: u64,
336 desired_generation: u64,
337 resources_hash: String,
338 ) -> Result<(), Error> {
339 resources.promote_services(client, &mz.namespace()).await?;
340 resources
341 .teardown_generation(client, mz, active_generation)
342 .await?;
343 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
344 self.update_status(
345 &mz_api,
346 mz,
347 MaterializeStatus {
348 active_generation: desired_generation,
349 last_completed_rollout_request: mz.requested_reconciliation_id(),
350 resource_id: mz.status().resource_id,
351 resources_hash,
352 conditions: vec![Condition {
353 type_: "UpToDate".into(),
354 status: "True".into(),
355 last_transition_time: Time(chrono::offset::Utc::now()),
356 message: format!(
357 "Successfully applied changes for generation {desired_generation}"
358 ),
359 observed_generation: mz.meta().generation,
360 reason: "Applied".into(),
361 }],
362 },
363 false,
364 )
365 .await?;
366 Ok(())
367 }
368}
369
370#[async_trait::async_trait]
371impl k8s_controller::Context for Context {
372 type Resource = Materialize;
373 type Error = Error;
374
375 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
376
377 #[instrument(fields(organization_name=mz.name_unchecked()))]
378 async fn apply(
379 &self,
380 client: Client,
381 mz: &Self::Resource,
382 ) -> Result<Option<Action>, Self::Error> {
383 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
384 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
385
386 let status = mz.status();
387 if mz.status.is_none() {
388 self.update_status(&mz_api, mz, status, true).await?;
389 return Ok(None);
392 }
393
394 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
395 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
396 .data
397 .as_ref()
398 .and_then(|data| data.get("license_key"))
399 {
400 let license_key = validate(
401 str::from_utf8(&license_key.0)
402 .context("invalid utf8")?
403 .trim(),
404 )?;
405 let environment_id = license_key
406 .environment_id
407 .parse()
408 .context("invalid environment id in license key")?;
409 Some(environment_id)
410 } else {
411 None
412 };
413
414 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
415 let mut mz = mz.clone();
416 if mz.spec.request_rollout.is_nil() {
417 mz.spec.request_rollout = Uuid::new_v4();
418 }
419 if mz.spec.environment_id.is_nil() {
420 if let Some(environment_id) = license_key_environment_id {
421 if environment_id.is_nil() {
422 mz.spec.environment_id = Uuid::new_v4();
425 } else {
426 mz.spec.environment_id = environment_id;
427 }
428 } else {
429 return Err(Error::Anyhow(anyhow::anyhow!(
430 "environment_id is not set in materialize resource {}/{} but no license key was given",
431 mz.namespace(),
432 mz.name_unchecked()
433 )));
434 }
435 }
436 mz_api
437 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
438 .await?;
439 return Ok(None);
443 }
444
445 if let Some(environment_id) = license_key_environment_id {
446 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
449 return Err(Error::Anyhow(anyhow::anyhow!(
450 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
451 mz.namespace(),
452 mz.name_unchecked(),
453 environment_id,
454 )));
455 }
456 }
457
458 let active_resources = environmentd::Resources::new(
464 &self.config,
465 &self.tracing,
466 &self.orchestratord_namespace,
467 mz,
468 status.active_generation,
469 );
470 let has_current_changes = status.resources_hash != active_resources.generate_hash();
471 let active_generation = status.active_generation;
472 let next_generation = active_generation + 1;
473 let desired_generation = if has_current_changes {
474 next_generation
475 } else {
476 active_generation
477 };
478
479 let resources = environmentd::Resources::new(
482 &self.config,
483 &self.tracing,
484 &self.orchestratord_namespace,
485 mz,
486 desired_generation,
487 );
488 let resources_hash = resources.generate_hash();
489
490 let mut result = match (
491 mz.is_promoting(),
492 has_current_changes,
493 mz.rollout_requested(),
494 ) {
495 (true, _, _) => {
498 self.promote(
499 &client,
500 mz,
501 resources,
502 active_generation,
503 desired_generation,
504 resources_hash,
505 )
506 .await?;
507 Ok(None)
508 }
509 (false, true, true) => {
511 let mz = self
523 .update_status(
524 &mz_api,
525 mz,
526 MaterializeStatus {
527 active_generation,
528 last_completed_rollout_request: status.last_completed_rollout_request,
533 resource_id: status.resource_id,
534 resources_hash: String::new(),
535 conditions: vec![Condition {
536 type_: "UpToDate".into(),
537 status: "Unknown".into(),
538 last_transition_time: Time(chrono::offset::Utc::now()),
539 message: format!(
540 "Applying changes for generation {desired_generation}"
541 ),
542 observed_generation: mz.meta().generation,
543 reason: "Applying".into(),
544 }],
545 },
546 active_generation != desired_generation,
547 )
548 .await?;
549 let mz = &mz;
550 let status = mz.status();
551
552 if mz.spec.rollout_strategy
553 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
554 {
555 resources
559 .teardown_generation(&client, mz, active_generation)
560 .await?;
561 }
562
563 trace!("applying environment resources");
564 match resources
565 .apply(&client, mz.should_force_promote(), &mz.namespace())
566 .await
567 {
568 Ok(Some(action)) => {
569 trace!("new environment is not yet ready");
570 Ok(Some(action))
571 }
572 Ok(None) => {
573 self.update_status(
581 &mz_api,
582 mz,
583 MaterializeStatus {
584 active_generation,
585 last_completed_rollout_request: status
590 .last_completed_rollout_request,
591 resource_id: status.resource_id,
592 resources_hash: resources_hash.clone(),
593 conditions: vec![Condition {
594 type_: "UpToDate".into(),
595 status: "Unknown".into(),
596 last_transition_time: Time(chrono::offset::Utc::now()),
597 message: format!(
598 "Attempting to promote generation {desired_generation}"
599 ),
600 observed_generation: mz.meta().generation,
601 reason: "Promoting".into(),
602 }],
603 },
604 active_generation != desired_generation,
605 )
606 .await?;
607 self.promote(
608 &client,
609 mz,
610 resources,
611 active_generation,
612 desired_generation,
613 resources_hash,
614 )
615 .await?;
616 Ok(None)
617 }
618 Err(e) => {
619 self.update_status(
620 &mz_api,
621 mz,
622 MaterializeStatus {
623 active_generation,
624 last_completed_rollout_request: status.last_completed_rollout_request,
629 resource_id: status.resource_id,
630 resources_hash: status.resources_hash,
631 conditions: vec![Condition {
632 type_: "UpToDate".into(),
633 status: "False".into(),
634 last_transition_time: Time(chrono::offset::Utc::now()),
635 message: format!(
636 "Failed to apply changes for generation {desired_generation}: {e}"
637 ),
638 observed_generation: mz.meta().generation,
639 reason: "FailedDeploy".into(),
640 }],
641 },
642 active_generation != desired_generation,
643 )
644 .await?;
645 Err(e)
646 }
647 }
648 }
649 (false, true, false) => {
651 let mut needs_update = mz.conditions_need_update();
652 if mz.update_in_progress() {
653 resources
654 .teardown_generation(&client, mz, next_generation)
655 .await?;
656 needs_update = true;
657 }
658 if needs_update {
659 self.update_status(
660 &mz_api,
661 mz,
662 MaterializeStatus {
663 active_generation,
664 last_completed_rollout_request: mz.requested_reconciliation_id(),
665 resource_id: status.resource_id,
666 resources_hash: status.resources_hash,
667 conditions: vec![Condition {
668 type_: "UpToDate".into(),
669 status: "False".into(),
670 last_transition_time: Time(chrono::offset::Utc::now()),
671 message: format!(
672 "Changes detected, waiting for approval for generation {desired_generation}"
673 ),
674 observed_generation: mz.meta().generation,
675 reason: "WaitingForApproval".into(),
676 }],
677 },
678 active_generation != desired_generation,
679 )
680 .await?;
681 }
682 debug!("changes detected, waiting for approval");
683 Ok(None)
684 }
685 (false, false, _) => {
687 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
692 if mz.update_in_progress() {
693 resources
694 .teardown_generation(&client, mz, next_generation)
695 .await?;
696 needs_update = true;
697 }
698 if needs_update {
699 self.update_status(
700 &mz_api,
701 mz,
702 MaterializeStatus {
703 active_generation,
704 last_completed_rollout_request: mz.requested_reconciliation_id(),
705 resource_id: status.resource_id,
706 resources_hash: status.resources_hash,
707 conditions: vec![Condition {
708 type_: "UpToDate".into(),
709 status: "True".into(),
710 last_transition_time: Time(chrono::offset::Utc::now()),
711 message: format!(
712 "No changes found from generation {active_generation}"
713 ),
714 observed_generation: mz.meta().generation,
715 reason: "Applied".into(),
716 }],
717 },
718 active_generation != desired_generation,
719 )
720 .await?;
721 }
722 debug!("no changes");
723 Ok(None)
724 }
725 };
726
727 if !matches!(result, Ok(None)) {
732 return result.map_err(Error::Anyhow);
733 }
734
735 let balancer = balancer::Resources::new(&self.config, mz);
736 if self.config.create_balancers {
737 result = balancer.apply(&client, &mz.namespace()).await;
738 } else {
739 result = balancer.cleanup(&client, &mz.namespace()).await;
740 }
741
742 if !matches!(result, Ok(None)) {
747 return result.map_err(Error::Anyhow);
748 }
749
750 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
751 else {
752 return Err(Error::Anyhow(anyhow::anyhow!(
753 "failed to parse environmentd image ref: {}",
754 mz.spec.environmentd_image_ref
755 )));
756 };
757 let console_image_tag = self
758 .config
759 .console_image_tag_map
760 .iter()
761 .find(|kv| kv.key == environmentd_image_tag)
762 .map(|kv| kv.value.clone())
763 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
764 let console = console::Resources::new(
765 &self.config,
766 mz,
767 &matching_image_from_environmentd_image_ref(
768 &mz.spec.environmentd_image_ref,
769 "console",
770 Some(&console_image_tag),
771 ),
772 );
773 if self.config.create_console {
774 console.apply(&client, &mz.namespace()).await?;
775 } else {
776 console.cleanup(&client, &mz.namespace()).await?;
777 }
778
779 result.map_err(Error::Anyhow)
780 }
781
782 #[instrument(fields(organization_name=mz.name_unchecked()))]
783 async fn cleanup(
784 &self,
785 _client: Client,
786 mz: &Self::Resource,
787 ) -> Result<Option<Action>, Self::Error> {
788 self.set_needs_update(mz, false);
789
790 Ok(None)
791 }
792}
793
794fn matching_image_from_environmentd_image_ref(
795 environmentd_image_ref: &str,
796 image_name: &str,
797 image_tag: Option<&str>,
798) -> String {
799 let namespace = environmentd_image_ref
800 .rsplit_once('/')
801 .unwrap_or(("materialize", ""))
802 .0;
803 let tag = image_tag.unwrap_or_else(|| {
804 environmentd_image_ref
805 .rsplit_once(':')
806 .unwrap_or(("", "unstable"))
807 .1
808 });
809 format!("{namespace}/{image_name}:{tag}")
810}