1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 str::FromStr,
14 sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::{
19 api::core::v1::{Affinity, Toleration},
20 apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
23use serde::Deserialize;
24use tracing::{debug, trace};
25use uuid::Uuid;
26
27use crate::metrics::Metrics;
28use mz_cloud_provider::CloudProvider;
29use mz_cloud_resources::crd::materialize::v1alpha1::{
30 Materialize, MaterializeCertSpec, MaterializeStatus,
31};
32use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
33use mz_orchestrator_tracing::TracingCliArgs;
34use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
35
36pub mod balancer;
37pub mod console;
38pub mod environmentd;
39pub mod tls;
40
41#[derive(clap::Parser)]
42pub struct MaterializeControllerArgs {
43 #[clap(long)]
44 cloud_provider: CloudProvider,
45 #[clap(long)]
46 region: String,
47 #[clap(long)]
48 create_balancers: bool,
49 #[clap(long)]
50 create_console: bool,
51 #[clap(long)]
52 helm_chart_version: Option<String>,
53 #[clap(long, default_value = "kubernetes")]
54 secrets_controller: String,
55 #[clap(long)]
56 collect_pod_metrics: bool,
57 #[clap(long)]
58 enable_prometheus_scrape_annotations: bool,
59 #[clap(long)]
60 disable_authentication: bool,
61
62 #[clap(long)]
63 segment_api_key: Option<String>,
64 #[clap(long)]
65 segment_client_side: bool,
66
67 #[clap(long)]
68 console_image_tag_default: String,
69 #[clap(long)]
70 console_image_tag_map: Vec<KeyValueArg<String, String>>,
71
72 #[clap(flatten)]
73 aws_info: AwsInfo,
74
75 #[clap(long)]
76 ephemeral_volume_class: Option<String>,
77 #[clap(long)]
78 scheduler_name: Option<String>,
79 #[clap(long)]
80 enable_security_context: bool,
81 #[clap(long)]
82 enable_internal_statement_logging: bool,
83 #[clap(long, default_value = "false")]
84 disable_statement_logging: bool,
85
86 #[clap(long)]
87 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
88 #[clap(long)]
89 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
90 #[clap(long, value_parser = parse_affinity)]
91 environmentd_affinity: Option<Affinity>,
92 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
93 environmentd_tolerations: Option<Vec<Toleration>>,
94 #[clap(long)]
95 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
96 #[clap(long, value_parser = parse_affinity)]
97 clusterd_affinity: Option<Affinity>,
98 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
99 clusterd_tolerations: Option<Vec<Toleration>>,
100 #[clap(long)]
101 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
102 #[clap(long, value_parser = parse_affinity)]
103 balancerd_affinity: Option<Affinity>,
104 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
105 balancerd_tolerations: Option<Vec<Toleration>>,
106 #[clap(long)]
107 console_node_selector: Vec<KeyValueArg<String, String>>,
108 #[clap(long, value_parser = parse_affinity)]
109 console_affinity: Option<Affinity>,
110 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
111 console_tolerations: Option<Vec<Toleration>>,
112 #[clap(long, default_value = "always", value_enum)]
113 image_pull_policy: KubernetesImagePullPolicy,
114 #[clap(flatten)]
115 network_policies: NetworkPolicyConfig,
116
117 #[clap(long)]
118 environmentd_cluster_replica_sizes: Option<String>,
119 #[clap(long)]
120 bootstrap_default_cluster_replica_size: Option<String>,
121 #[clap(long)]
122 bootstrap_builtin_system_cluster_replica_size: Option<String>,
123 #[clap(long)]
124 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
125 #[clap(long)]
126 bootstrap_builtin_support_cluster_replica_size: Option<String>,
127 #[clap(long)]
128 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
129 #[clap(long)]
130 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
131 #[clap(long)]
132 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
133 #[clap(long)]
134 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
135 #[clap(long)]
136 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
137 #[clap(long)]
138 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
139
140 #[clap(
141 long,
142 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
143 )]
144 environmentd_allowed_origins: Vec<HeaderValue>,
145 #[clap(long, default_value = "https://console.materialize.com")]
146 internal_console_proxy_url: String,
147
148 #[clap(long, default_value = "6875")]
149 environmentd_sql_port: u16,
150 #[clap(long, default_value = "6876")]
151 environmentd_http_port: u16,
152 #[clap(long, default_value = "6877")]
153 environmentd_internal_sql_port: u16,
154 #[clap(long, default_value = "6878")]
155 environmentd_internal_http_port: u16,
156 #[clap(long, default_value = "6879")]
157 environmentd_internal_persist_pubsub_port: u16,
158
159 #[clap(long, default_value = "6875")]
160 balancerd_sql_port: u16,
161 #[clap(long, default_value = "6876")]
162 balancerd_http_port: u16,
163 #[clap(long, default_value = "8080")]
164 balancerd_internal_http_port: u16,
165
166 #[clap(long, default_value = "8080")]
167 console_http_port: u16,
168
169 #[clap(long, default_value = "{}")]
170 default_certificate_specs: DefaultCertificateSpecs,
171
172 #[clap(long, hide = true)]
173 disable_license_key_checks: bool,
174}
175
176fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
177 Ok(serde_json::from_str(s)?)
178}
179
180fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
181 Ok(serde_json::from_str(s)?)
182}
183
184#[derive(Clone, Deserialize, Default)]
185#[serde(rename_all = "camelCase")]
186pub struct DefaultCertificateSpecs {
187 balancerd_external: Option<MaterializeCertSpec>,
188 console_external: Option<MaterializeCertSpec>,
189 internal: Option<MaterializeCertSpec>,
190}
191
192impl FromStr for DefaultCertificateSpecs {
193 type Err = serde_json::Error;
194
195 fn from_str(s: &str) -> Result<Self, Self::Err> {
196 serde_json::from_str(s)
197 }
198}
199
200#[derive(clap::Parser)]
201pub struct AwsInfo {
202 #[clap(long)]
203 aws_account_id: Option<String>,
204 #[clap(long)]
205 environmentd_iam_role_arn: Option<String>,
206 #[clap(long)]
207 environmentd_connection_role_arn: Option<String>,
208 #[clap(long)]
209 aws_secrets_controller_tags: Vec<String>,
210 #[clap(long)]
211 environmentd_availability_zones: Option<Vec<String>>,
212}
213
214#[derive(clap::Parser)]
215pub struct NetworkPolicyConfig {
216 #[clap(long = "network-policies-internal-enabled")]
217 internal_enabled: bool,
218
219 #[clap(long = "network-policies-ingress-enabled")]
220 ingress_enabled: bool,
221
222 #[clap(long = "network-policies-ingress-cidrs")]
223 ingress_cidrs: Vec<String>,
224
225 #[clap(long = "network-policies-egress-enabled")]
226 egress_enabled: bool,
227
228 #[clap(long = "network-policies-egress-cidrs")]
229 egress_cidrs: Vec<String>,
230}
231
232#[derive(Debug, thiserror::Error)]
233pub enum Error {
234 Anyhow(#[from] anyhow::Error),
235 Kube(#[from] kube::Error),
236}
237
238impl Display for Error {
239 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240 match self {
241 Self::Anyhow(e) => write!(f, "{e}"),
242 Self::Kube(e) => write!(f, "{e}"),
243 }
244 }
245}
246
247pub struct Context {
248 config: MaterializeControllerArgs,
249 tracing: TracingCliArgs,
250 orchestratord_namespace: String,
251 metrics: Arc<Metrics>,
252 needs_update: Arc<Mutex<BTreeSet<String>>>,
253}
254
255impl Context {
256 pub fn new(
257 config: MaterializeControllerArgs,
258 tracing: TracingCliArgs,
259 orchestratord_namespace: String,
260 metrics: Arc<Metrics>,
261 ) -> Self {
262 if config.cloud_provider == CloudProvider::Aws {
263 assert!(
264 config.aws_info.aws_account_id.is_some(),
265 "--aws-account-id is required when using --cloud-provider=aws"
266 );
267 }
268
269 Self {
270 config,
271 tracing,
272 orchestratord_namespace,
273 metrics,
274 needs_update: Default::default(),
275 }
276 }
277
278 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
279 let mut needs_update_set = self.needs_update.lock().unwrap();
280 if needs_update {
281 needs_update_set.insert(mz.name_unchecked());
282 } else {
283 needs_update_set.remove(&mz.name_unchecked());
284 }
285 self.metrics
286 .environmentd_needs_update
287 .set(u64::cast_from(needs_update_set.len()));
288 }
289
290 async fn update_status(
291 &self,
292 mz_api: &Api<Materialize>,
293 mz: &Materialize,
294 status: MaterializeStatus,
295 needs_update: bool,
296 ) -> Result<Materialize, kube::Error> {
297 self.set_needs_update(mz, needs_update);
298
299 let mut new_mz = mz.clone();
300 if !mz
301 .status
302 .as_ref()
303 .map_or(true, |mz_status| mz_status.needs_update(&status))
304 {
305 return Ok(new_mz);
306 }
307
308 new_mz.status = Some(status);
309 mz_api
310 .replace_status(
311 &mz.name_unchecked(),
312 &PostParams::default(),
313 serde_json::to_vec(&new_mz).unwrap(),
314 )
315 .await
316 }
317}
318
319#[async_trait::async_trait]
320impl k8s_controller::Context for Context {
321 type Resource = Materialize;
322 type Error = Error;
323
324 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
325
326 #[instrument(fields(organization_name=mz.name_unchecked()))]
327 async fn apply(
328 &self,
329 client: Client,
330 mz: &Self::Resource,
331 ) -> Result<Option<Action>, Self::Error> {
332 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
333
334 let status = mz.status();
335 if mz.status.is_none() {
336 self.update_status(&mz_api, mz, status, true).await?;
337 return Ok(None);
340 }
341
342 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
343 let mut mz = mz.clone();
344 if mz.spec.request_rollout.is_nil() {
345 mz.spec.request_rollout = Uuid::new_v4();
346 }
347 if mz.spec.environment_id.is_nil() {
348 mz.spec.environment_id = Uuid::new_v4();
349 }
350 mz_api
351 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
352 .await?;
353 return Ok(None);
357 }
358
359 let active_resources = environmentd::Resources::new(
365 &self.config,
366 &self.tracing,
367 &self.orchestratord_namespace,
368 mz,
369 status.active_generation,
370 );
371 let has_current_changes = status.resources_hash != active_resources.generate_hash();
372 let active_generation = status.active_generation;
373 let next_generation = active_generation + 1;
374 let increment_generation = has_current_changes && !mz.in_place_rollout();
375 let desired_generation = if increment_generation {
376 next_generation
377 } else {
378 active_generation
379 };
380
381 let resources = environmentd::Resources::new(
384 &self.config,
385 &self.tracing,
386 &self.orchestratord_namespace,
387 mz,
388 desired_generation,
389 );
390 let resources_hash = resources.generate_hash();
391
392 let mut result = if has_current_changes {
393 if mz.rollout_requested() {
394 let mz = self
406 .update_status(
407 &mz_api,
408 mz,
409 MaterializeStatus {
410 active_generation,
411 last_completed_rollout_request: status.last_completed_rollout_request,
416 resource_id: status.resource_id,
417 resources_hash: String::new(),
418 conditions: vec![Condition {
419 type_: "UpToDate".into(),
420 status: "Unknown".into(),
421 last_transition_time: Time(chrono::offset::Utc::now()),
422 message: format!(
423 "Applying changes for generation {desired_generation}"
424 ),
425 observed_generation: mz.meta().generation,
426 reason: "Applying".into(),
427 }],
428 },
429 active_generation != desired_generation,
430 )
431 .await?;
432 let mz = &mz;
433 let status = mz.status();
434
435 trace!("applying environment resources");
436 match resources
437 .apply(
438 &client,
439 increment_generation,
440 mz.should_force_promote(),
441 &mz.namespace(),
442 )
443 .await
444 {
445 Ok(Some(action)) => {
446 trace!("new environment is not yet ready");
447 Ok(Some(action))
448 }
449 Ok(None) => {
450 resources.promote_services(&client, &mz.namespace()).await?;
454 if increment_generation {
455 resources
456 .teardown_generation(&client, mz, active_generation)
457 .await?;
458 }
459 self.update_status(
460 &mz_api,
461 mz,
462 MaterializeStatus {
463 active_generation: desired_generation,
464 last_completed_rollout_request: mz.requested_reconciliation_id(),
465 resource_id: status.resource_id,
466 resources_hash,
467 conditions: vec![Condition {
468 type_: "UpToDate".into(),
469 status: "True".into(),
470 last_transition_time: Time(chrono::offset::Utc::now()),
471 message: format!(
472 "Successfully applied changes for generation {desired_generation}"
473 ),
474 observed_generation: mz.meta().generation,
475 reason: "Applied".into(),
476 }],
477 },
478 false,
479 )
480 .await?;
481 Ok(None)
482 }
483 Err(e) => {
484 resources
490 .teardown_generation(&client, mz, next_generation)
491 .await?;
492 self.update_status(
493 &mz_api,
494 mz,
495 MaterializeStatus {
496 active_generation,
497 last_completed_rollout_request: status.last_completed_rollout_request,
502 resource_id: status.resource_id,
503 resources_hash: status.resources_hash,
504 conditions: vec![Condition {
505 type_: "UpToDate".into(),
506 status: "False".into(),
507 last_transition_time: Time(chrono::offset::Utc::now()),
508 message: format!(
509 "Failed to apply changes for generation {desired_generation}: {e}"
510 ),
511 observed_generation: mz.meta().generation,
512 reason: "FailedDeploy".into(),
513 }],
514 },
515 active_generation != desired_generation,
516 )
517 .await?;
518 Err(e)
519 }
520 }
521 } else {
522 let mut needs_update = mz.conditions_need_update();
523 if mz.update_in_progress() {
524 resources
525 .teardown_generation(&client, mz, next_generation)
526 .await?;
527 needs_update = true;
528 }
529 if needs_update {
530 self.update_status(
531 &mz_api,
532 mz,
533 MaterializeStatus {
534 active_generation,
535 last_completed_rollout_request: mz.requested_reconciliation_id(),
536 resource_id: status.resource_id,
537 resources_hash: status.resources_hash,
538 conditions: vec![Condition {
539 type_: "UpToDate".into(),
540 status: "False".into(),
541 last_transition_time: Time(chrono::offset::Utc::now()),
542 message: format!(
543 "Changes detected, waiting for approval for generation {desired_generation}"
544 ),
545 observed_generation: mz.meta().generation,
546 reason: "WaitingForApproval".into(),
547 }],
548 },
549 active_generation != desired_generation,
550 )
551 .await?;
552 }
553 debug!("changes detected, waiting for approval");
554 Ok(None)
555 }
556 } else {
557 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
562 if mz.update_in_progress() {
563 resources
564 .teardown_generation(&client, mz, next_generation)
565 .await?;
566 needs_update = true;
567 }
568 if needs_update {
569 self.update_status(
570 &mz_api,
571 mz,
572 MaterializeStatus {
573 active_generation,
574 last_completed_rollout_request: mz.requested_reconciliation_id(),
575 resource_id: status.resource_id,
576 resources_hash: status.resources_hash,
577 conditions: vec![Condition {
578 type_: "UpToDate".into(),
579 status: "True".into(),
580 last_transition_time: Time(chrono::offset::Utc::now()),
581 message: format!(
582 "No changes found from generation {active_generation}"
583 ),
584 observed_generation: mz.meta().generation,
585 reason: "Applied".into(),
586 }],
587 },
588 active_generation != desired_generation,
589 )
590 .await?;
591 }
592 debug!("no changes");
593 Ok(None)
594 };
595
596 if !matches!(result, Ok(None)) {
601 return result.map_err(Error::Anyhow);
602 }
603
604 let balancer = balancer::Resources::new(&self.config, mz);
605 if self.config.create_balancers {
606 result = balancer.apply(&client, &mz.namespace()).await;
607 } else {
608 result = balancer.cleanup(&client, &mz.namespace()).await;
609 }
610
611 if !matches!(result, Ok(None)) {
616 return result.map_err(Error::Anyhow);
617 }
618
619 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
620 else {
621 return Err(Error::Anyhow(anyhow::anyhow!(
622 "failed to parse environmentd image ref: {}",
623 mz.spec.environmentd_image_ref
624 )));
625 };
626 let console_image_tag = self
627 .config
628 .console_image_tag_map
629 .iter()
630 .find(|kv| kv.key == environmentd_image_tag)
631 .map(|kv| kv.value.clone())
632 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
633 let console = console::Resources::new(
634 &self.config,
635 mz,
636 &matching_image_from_environmentd_image_ref(
637 &mz.spec.environmentd_image_ref,
638 "console",
639 Some(&console_image_tag),
640 ),
641 );
642 if self.config.create_console {
643 console.apply(&client, &mz.namespace()).await?;
644 } else {
645 console.cleanup(&client, &mz.namespace()).await?;
646 }
647
648 result.map_err(Error::Anyhow)
649 }
650
651 #[instrument(fields(organization_name=mz.name_unchecked()))]
652 async fn cleanup(
653 &self,
654 _client: Client,
655 mz: &Self::Resource,
656 ) -> Result<Option<Action>, Self::Error> {
657 self.set_needs_update(mz, false);
658
659 Ok(None)
660 }
661}
662
663fn matching_image_from_environmentd_image_ref(
664 environmentd_image_ref: &str,
665 image_name: &str,
666 image_tag: Option<&str>,
667) -> String {
668 let namespace = environmentd_image_ref
669 .rsplit_once('/')
670 .unwrap_or(("materialize", ""))
671 .0;
672 let tag = image_tag.unwrap_or_else(|| {
673 environmentd_image_ref
674 .rsplit_once(':')
675 .unwrap_or(("", "unstable"))
676 .1
677 });
678 format!("{namespace}/{image_name}:{tag}")
679}