1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 str::FromStr,
14 sync::{Arc, Mutex},
15};
16
17use anyhow::Context as _;
18use http::HeaderValue;
19use k8s_openapi::{
20 api::core::v1::{Affinity, ResourceRequirements, Secret, Toleration},
21 apimachinery::pkg::apis::meta::v1::{Condition, Time},
22};
23use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
24use serde::Deserialize;
25use tracing::{debug, trace};
26use uuid::Uuid;
27
28use crate::{controller::materialize::environmentd::V161, metrics::Metrics};
29use mz_cloud_provider::CloudProvider;
30use mz_cloud_resources::crd::materialize::v1alpha1::{
31 Materialize, MaterializeCertSpec, MaterializeRolloutStrategy, MaterializeStatus,
32};
33use mz_license_keys::validate;
34use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
35use mz_orchestrator_tracing::TracingCliArgs;
36use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
37
38pub mod balancer;
39pub mod console;
40pub mod environmentd;
41pub mod tls;
42
43#[derive(clap::Parser)]
44pub struct MaterializeControllerArgs {
45 #[clap(long)]
46 cloud_provider: CloudProvider,
47 #[clap(long)]
48 region: String,
49 #[clap(long)]
50 create_balancers: bool,
51 #[clap(long)]
52 create_console: bool,
53 #[clap(long)]
54 helm_chart_version: Option<String>,
55 #[clap(long, default_value = "kubernetes")]
56 secrets_controller: String,
57 #[clap(long)]
58 collect_pod_metrics: bool,
59 #[clap(long)]
60 enable_prometheus_scrape_annotations: bool,
61 #[clap(long)]
62 disable_authentication: bool,
63
64 #[clap(long)]
65 segment_api_key: Option<String>,
66 #[clap(long)]
67 segment_client_side: bool,
68
69 #[clap(long)]
70 console_image_tag_default: String,
71 #[clap(long)]
72 console_image_tag_map: Vec<KeyValueArg<String, String>>,
73
74 #[clap(flatten)]
75 aws_info: AwsInfo,
76
77 #[clap(long)]
78 ephemeral_volume_class: Option<String>,
79 #[clap(long)]
80 scheduler_name: Option<String>,
81 #[clap(long)]
82 enable_security_context: bool,
83 #[clap(long)]
84 enable_internal_statement_logging: bool,
85 #[clap(long, default_value = "false")]
86 disable_statement_logging: bool,
87
88 #[clap(long)]
89 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
90 #[clap(long)]
91 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
92 #[clap(long, value_parser = parse_affinity)]
93 environmentd_affinity: Option<Affinity>,
94 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
95 environmentd_tolerations: Option<Vec<Toleration>>,
96 #[clap(long, value_parser = parse_resources)]
97 environmentd_default_resources: Option<ResourceRequirements>,
98 #[clap(long)]
99 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
100 #[clap(long, value_parser = parse_affinity)]
101 clusterd_affinity: Option<Affinity>,
102 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
103 clusterd_tolerations: Option<Vec<Toleration>>,
104 #[clap(long)]
105 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
106 #[clap(long, value_parser = parse_affinity)]
107 balancerd_affinity: Option<Affinity>,
108 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
109 balancerd_tolerations: Option<Vec<Toleration>>,
110 #[clap(long, value_parser = parse_resources)]
111 balancerd_default_resources: Option<ResourceRequirements>,
112 #[clap(long)]
113 console_node_selector: Vec<KeyValueArg<String, String>>,
114 #[clap(long, value_parser = parse_affinity)]
115 console_affinity: Option<Affinity>,
116 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
117 console_tolerations: Option<Vec<Toleration>>,
118 #[clap(long, value_parser = parse_resources)]
119 console_default_resources: Option<ResourceRequirements>,
120 #[clap(long, default_value = "always", value_enum)]
121 image_pull_policy: KubernetesImagePullPolicy,
122 #[clap(flatten)]
123 network_policies: NetworkPolicyConfig,
124
125 #[clap(long)]
126 environmentd_cluster_replica_sizes: Option<String>,
127 #[clap(long)]
128 bootstrap_default_cluster_replica_size: Option<String>,
129 #[clap(long)]
130 bootstrap_builtin_system_cluster_replica_size: Option<String>,
131 #[clap(long)]
132 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
133 #[clap(long)]
134 bootstrap_builtin_support_cluster_replica_size: Option<String>,
135 #[clap(long)]
136 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
137 #[clap(long)]
138 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
139 #[clap(long)]
140 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
141 #[clap(long)]
142 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
143 #[clap(long)]
144 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
145 #[clap(long)]
146 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
147
148 #[clap(
149 long,
150 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
151 )]
152 environmentd_allowed_origins: Vec<HeaderValue>,
153 #[clap(long, default_value = "https://console.materialize.com")]
154 internal_console_proxy_url: String,
155
156 #[clap(long, default_value = "6875")]
157 environmentd_sql_port: u16,
158 #[clap(long, default_value = "6876")]
159 environmentd_http_port: u16,
160 #[clap(long, default_value = "6877")]
161 environmentd_internal_sql_port: u16,
162 #[clap(long, default_value = "6878")]
163 environmentd_internal_http_port: u16,
164 #[clap(long, default_value = "6879")]
165 environmentd_internal_persist_pubsub_port: u16,
166
167 #[clap(long, default_value = "6875")]
168 balancerd_sql_port: u16,
169 #[clap(long, default_value = "6876")]
170 balancerd_http_port: u16,
171 #[clap(long, default_value = "8080")]
172 balancerd_internal_http_port: u16,
173
174 #[clap(long, default_value = "8080")]
175 console_http_port: u16,
176
177 #[clap(long, default_value = "{}")]
178 default_certificate_specs: DefaultCertificateSpecs,
179
180 #[clap(long, hide = true)]
181 disable_license_key_checks: bool,
182}
183
184fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
185 Ok(serde_json::from_str(s)?)
186}
187
188fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
189 Ok(serde_json::from_str(s)?)
190}
191
192fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
193 Ok(serde_json::from_str(s)?)
194}
195
196#[derive(Clone, Deserialize, Default)]
197#[serde(rename_all = "camelCase")]
198pub struct DefaultCertificateSpecs {
199 balancerd_external: Option<MaterializeCertSpec>,
200 console_external: Option<MaterializeCertSpec>,
201 internal: Option<MaterializeCertSpec>,
202}
203
204impl FromStr for DefaultCertificateSpecs {
205 type Err = serde_json::Error;
206
207 fn from_str(s: &str) -> Result<Self, Self::Err> {
208 serde_json::from_str(s)
209 }
210}
211
212#[derive(clap::Parser)]
213pub struct AwsInfo {
214 #[clap(long)]
215 aws_account_id: Option<String>,
216 #[clap(long)]
217 environmentd_iam_role_arn: Option<String>,
218 #[clap(long)]
219 environmentd_connection_role_arn: Option<String>,
220 #[clap(long)]
221 aws_secrets_controller_tags: Vec<String>,
222 #[clap(long)]
223 environmentd_availability_zones: Option<Vec<String>>,
224}
225
226#[derive(clap::Parser)]
227pub struct NetworkPolicyConfig {
228 #[clap(long = "network-policies-internal-enabled")]
229 internal_enabled: bool,
230
231 #[clap(long = "network-policies-ingress-enabled")]
232 ingress_enabled: bool,
233
234 #[clap(long = "network-policies-ingress-cidrs")]
235 ingress_cidrs: Vec<String>,
236
237 #[clap(long = "network-policies-egress-enabled")]
238 egress_enabled: bool,
239
240 #[clap(long = "network-policies-egress-cidrs")]
241 egress_cidrs: Vec<String>,
242}
243
244#[derive(Debug, thiserror::Error)]
245pub enum Error {
246 Anyhow(#[from] anyhow::Error),
247 Kube(#[from] kube::Error),
248 Reqwest(#[from] reqwest::Error),
249}
250
251impl Display for Error {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 Self::Anyhow(e) => write!(f, "{e}"),
255 Self::Kube(e) => write!(f, "{e}"),
256 Self::Reqwest(e) => write!(f, "{e}"),
257 }
258 }
259}
260
261pub struct Context {
262 config: MaterializeControllerArgs,
263 tracing: TracingCliArgs,
264 orchestratord_namespace: String,
265 metrics: Arc<Metrics>,
266 needs_update: Arc<Mutex<BTreeSet<String>>>,
267}
268
269impl Context {
270 pub fn new(
271 config: MaterializeControllerArgs,
272 tracing: TracingCliArgs,
273 orchestratord_namespace: String,
274 metrics: Arc<Metrics>,
275 ) -> Self {
276 if config.cloud_provider == CloudProvider::Aws {
277 assert!(
278 config.aws_info.aws_account_id.is_some(),
279 "--aws-account-id is required when using --cloud-provider=aws"
280 );
281 }
282
283 Self {
284 config,
285 tracing,
286 orchestratord_namespace,
287 metrics,
288 needs_update: Default::default(),
289 }
290 }
291
292 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
293 let mut needs_update_set = self.needs_update.lock().unwrap();
294 if needs_update {
295 needs_update_set.insert(mz.name_unchecked());
296 } else {
297 needs_update_set.remove(&mz.name_unchecked());
298 }
299 self.metrics
300 .environmentd_needs_update
301 .set(u64::cast_from(needs_update_set.len()));
302 }
303
304 async fn update_status(
305 &self,
306 mz_api: &Api<Materialize>,
307 mz: &Materialize,
308 status: MaterializeStatus,
309 needs_update: bool,
310 ) -> Result<Materialize, kube::Error> {
311 self.set_needs_update(mz, needs_update);
312
313 let mut new_mz = mz.clone();
314 if !mz
315 .status
316 .as_ref()
317 .map_or(true, |mz_status| mz_status.needs_update(&status))
318 {
319 return Ok(new_mz);
320 }
321
322 new_mz.status = Some(status);
323 mz_api
324 .replace_status(
325 &mz.name_unchecked(),
326 &PostParams::default(),
327 serde_json::to_vec(&new_mz).unwrap(),
328 )
329 .await
330 }
331
332 async fn promote(
333 &self,
334 client: &Client,
335 mz: &Materialize,
336 resources: environmentd::Resources,
337 active_generation: u64,
338 desired_generation: u64,
339 resources_hash: String,
340 ) -> Result<Option<Action>, Error> {
341 if let Some(action) = resources.promote_services(client, &mz.namespace()).await? {
342 return Ok(Some(action));
343 }
344 resources
345 .teardown_generation(client, mz, active_generation)
346 .await?;
347 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
348 self.update_status(
349 &mz_api,
350 mz,
351 MaterializeStatus {
352 active_generation: desired_generation,
353 last_completed_rollout_request: mz.requested_reconciliation_id(),
354 resource_id: mz.status().resource_id,
355 resources_hash,
356 conditions: vec![Condition {
357 type_: "UpToDate".into(),
358 status: "True".into(),
359 last_transition_time: Time(chrono::offset::Utc::now()),
360 message: format!(
361 "Successfully applied changes for generation {desired_generation}"
362 ),
363 observed_generation: mz.meta().generation,
364 reason: "Applied".into(),
365 }],
366 },
367 false,
368 )
369 .await?;
370 Ok(None)
371 }
372}
373
374#[async_trait::async_trait]
375impl k8s_controller::Context for Context {
376 type Resource = Materialize;
377 type Error = Error;
378
379 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
380
381 #[instrument(fields(organization_name=mz.name_unchecked()))]
382 async fn apply(
383 &self,
384 client: Client,
385 mz: &Self::Resource,
386 ) -> Result<Option<Action>, Self::Error> {
387 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
388 let secret_api: Api<Secret> = Api::namespaced(client.clone(), &mz.namespace());
389
390 let status = mz.status();
391 if mz.status.is_none() {
392 self.update_status(&mz_api, mz, status, true).await?;
393 return Ok(None);
396 }
397
398 let backend_secret = secret_api.get(&mz.spec.backend_secret_name).await?;
399 let license_key_environment_id: Option<Uuid> = if let Some(license_key) = backend_secret
400 .data
401 .as_ref()
402 .and_then(|data| data.get("license_key"))
403 {
404 let license_key = validate(
405 str::from_utf8(&license_key.0)
406 .context("invalid utf8")?
407 .trim(),
408 )?;
409 let environment_id = license_key
410 .environment_id
411 .parse()
412 .context("invalid environment id in license key")?;
413 Some(environment_id)
414 } else {
415 if mz.meets_minimum_version(&V161) {
416 return Err(Error::Anyhow(anyhow::anyhow!(
417 "license_key is required when running in kubernetes",
418 )));
419 } else {
420 None
421 }
422 };
423
424 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
425 let mut mz = mz.clone();
426 if mz.spec.request_rollout.is_nil() {
427 mz.spec.request_rollout = Uuid::new_v4();
428 }
429 if mz.spec.environment_id.is_nil() {
430 if let Some(environment_id) = license_key_environment_id {
431 if environment_id.is_nil() {
432 mz.spec.environment_id = Uuid::new_v4();
435 } else {
436 mz.spec.environment_id = environment_id;
437 }
438 } else {
439 if mz.meets_minimum_version(&V161) {
440 return Err(Error::Anyhow(anyhow::anyhow!(
441 "environmentId is not set in materialize resource {}/{} but no license key was given",
442 mz.namespace(),
443 mz.name_unchecked()
444 )));
445 } else {
446 mz.spec.environment_id = Uuid::new_v4();
447 }
448 }
449 }
450 mz_api
451 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
452 .await?;
453 return Ok(None);
457 }
458
459 if let Some(environment_id) = license_key_environment_id {
460 if !environment_id.is_nil() && mz.spec.environment_id != environment_id {
463 return Err(Error::Anyhow(anyhow::anyhow!(
464 "environment_id is set in materialize resource {}/{} but does not match the environment_id set in the associated license key {}",
465 mz.namespace(),
466 mz.name_unchecked(),
467 environment_id,
468 )));
469 }
470 }
471
472 let active_resources = environmentd::Resources::new(
478 &self.config,
479 &self.tracing,
480 &self.orchestratord_namespace,
481 mz,
482 status.active_generation,
483 );
484 let has_current_changes = status.resources_hash != active_resources.generate_hash();
485 let active_generation = status.active_generation;
486 let next_generation = active_generation + 1;
487 let desired_generation = if has_current_changes {
488 next_generation
489 } else {
490 active_generation
491 };
492
493 let resources = environmentd::Resources::new(
496 &self.config,
497 &self.tracing,
498 &self.orchestratord_namespace,
499 mz,
500 desired_generation,
501 );
502 let resources_hash = resources.generate_hash();
503
504 let mut result = match (
505 mz.is_promoting(),
506 has_current_changes,
507 mz.rollout_requested(),
508 ) {
509 (true, _, _) => {
512 self.promote(
513 &client,
514 mz,
515 resources,
516 active_generation,
517 desired_generation,
518 resources_hash,
519 )
520 .await
521 }
522 (false, true, true) => {
524 let mz = self
536 .update_status(
537 &mz_api,
538 mz,
539 MaterializeStatus {
540 active_generation,
541 last_completed_rollout_request: status.last_completed_rollout_request,
546 resource_id: status.resource_id,
547 resources_hash: String::new(),
548 conditions: vec![Condition {
549 type_: "UpToDate".into(),
550 status: "Unknown".into(),
551 last_transition_time: Time(chrono::offset::Utc::now()),
552 message: format!(
553 "Applying changes for generation {desired_generation}"
554 ),
555 observed_generation: mz.meta().generation,
556 reason: "Applying".into(),
557 }],
558 },
559 active_generation != desired_generation,
560 )
561 .await?;
562 let mz = &mz;
563 let status = mz.status();
564
565 if mz.spec.rollout_strategy
566 == MaterializeRolloutStrategy::ImmediatelyPromoteCausingDowntime
567 {
568 resources
572 .teardown_generation(&client, mz, active_generation)
573 .await?;
574 }
575
576 trace!("applying environment resources");
577 match resources
578 .apply(&client, mz.should_force_promote(), &mz.namespace())
579 .await
580 {
581 Ok(Some(action)) => {
582 trace!("new environment is not yet ready");
583 Ok(Some(action))
584 }
585 Ok(None) => {
586 self.update_status(
594 &mz_api,
595 mz,
596 MaterializeStatus {
597 active_generation,
598 last_completed_rollout_request: status
603 .last_completed_rollout_request,
604 resource_id: status.resource_id,
605 resources_hash: resources_hash.clone(),
606 conditions: vec![Condition {
607 type_: "UpToDate".into(),
608 status: "Unknown".into(),
609 last_transition_time: Time(chrono::offset::Utc::now()),
610 message: format!(
611 "Attempting to promote generation {desired_generation}"
612 ),
613 observed_generation: mz.meta().generation,
614 reason: "Promoting".into(),
615 }],
616 },
617 active_generation != desired_generation,
618 )
619 .await?;
620 self.promote(
621 &client,
622 mz,
623 resources,
624 active_generation,
625 desired_generation,
626 resources_hash,
627 )
628 .await
629 }
630 Err(e) => {
631 self.update_status(
632 &mz_api,
633 mz,
634 MaterializeStatus {
635 active_generation,
636 last_completed_rollout_request: status.last_completed_rollout_request,
641 resource_id: status.resource_id,
642 resources_hash: status.resources_hash,
643 conditions: vec![Condition {
644 type_: "UpToDate".into(),
645 status: "False".into(),
646 last_transition_time: Time(chrono::offset::Utc::now()),
647 message: format!(
648 "Failed to apply changes for generation {desired_generation}: {e}"
649 ),
650 observed_generation: mz.meta().generation,
651 reason: "FailedDeploy".into(),
652 }],
653 },
654 active_generation != desired_generation,
655 )
656 .await?;
657 Err(e)
658 }
659 }
660 }
661 (false, true, false) => {
663 let mut needs_update = mz.conditions_need_update();
664 if mz.update_in_progress() {
665 resources
666 .teardown_generation(&client, mz, next_generation)
667 .await?;
668 needs_update = true;
669 }
670 if needs_update {
671 self.update_status(
672 &mz_api,
673 mz,
674 MaterializeStatus {
675 active_generation,
676 last_completed_rollout_request: mz.requested_reconciliation_id(),
677 resource_id: status.resource_id,
678 resources_hash: status.resources_hash,
679 conditions: vec![Condition {
680 type_: "UpToDate".into(),
681 status: "False".into(),
682 last_transition_time: Time(chrono::offset::Utc::now()),
683 message: format!(
684 "Changes detected, waiting for approval for generation {desired_generation}"
685 ),
686 observed_generation: mz.meta().generation,
687 reason: "WaitingForApproval".into(),
688 }],
689 },
690 active_generation != desired_generation,
691 )
692 .await?;
693 }
694 debug!("changes detected, waiting for approval");
695 Ok(None)
696 }
697 (false, false, _) => {
699 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
704 if mz.update_in_progress() {
705 resources
706 .teardown_generation(&client, mz, next_generation)
707 .await?;
708 needs_update = true;
709 }
710 if needs_update {
711 self.update_status(
712 &mz_api,
713 mz,
714 MaterializeStatus {
715 active_generation,
716 last_completed_rollout_request: mz.requested_reconciliation_id(),
717 resource_id: status.resource_id,
718 resources_hash: status.resources_hash,
719 conditions: vec![Condition {
720 type_: "UpToDate".into(),
721 status: "True".into(),
722 last_transition_time: Time(chrono::offset::Utc::now()),
723 message: format!(
724 "No changes found from generation {active_generation}"
725 ),
726 observed_generation: mz.meta().generation,
727 reason: "Applied".into(),
728 }],
729 },
730 active_generation != desired_generation,
731 )
732 .await?;
733 }
734 debug!("no changes");
735 Ok(None)
736 }
737 }?;
738
739 if let Some(action) = result {
740 return Ok(Some(action));
741 }
742
743 let balancer = balancer::Resources::new(&self.config, mz);
748 if self.config.create_balancers {
749 result = balancer.apply(&client, &mz.namespace()).await?;
750 } else {
751 result = balancer.cleanup(&client, &mz.namespace()).await?;
752 }
753
754 if let Some(action) = result {
755 return Ok(Some(action));
756 }
757
758 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
763 else {
764 return Err(Error::Anyhow(anyhow::anyhow!(
765 "failed to parse environmentd image ref: {}",
766 mz.spec.environmentd_image_ref
767 )));
768 };
769 let console_image_tag = self
770 .config
771 .console_image_tag_map
772 .iter()
773 .find(|kv| kv.key == environmentd_image_tag)
774 .map(|kv| kv.value.clone())
775 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
776 let console = console::Resources::new(
777 &self.config,
778 mz,
779 &matching_image_from_environmentd_image_ref(
780 &mz.spec.environmentd_image_ref,
781 "console",
782 Some(&console_image_tag),
783 ),
784 );
785 if self.config.create_console {
786 console.apply(&client, &mz.namespace()).await?;
787 } else {
788 console.cleanup(&client, &mz.namespace()).await?;
789 }
790
791 Ok(result)
792 }
793
794 #[instrument(fields(organization_name=mz.name_unchecked()))]
795 async fn cleanup(
796 &self,
797 _client: Client,
798 mz: &Self::Resource,
799 ) -> Result<Option<Action>, Self::Error> {
800 self.set_needs_update(mz, false);
801
802 Ok(None)
803 }
804}
805
806fn matching_image_from_environmentd_image_ref(
807 environmentd_image_ref: &str,
808 image_name: &str,
809 image_tag: Option<&str>,
810) -> String {
811 let namespace = environmentd_image_ref
812 .rsplit_once('/')
813 .unwrap_or(("materialize", ""))
814 .0;
815 let tag = image_tag.unwrap_or_else(|| {
816 environmentd_image_ref
817 .rsplit_once(':')
818 .unwrap_or(("", "unstable"))
819 .1
820 });
821 format!("{namespace}/{image_name}:{tag}")
822}