1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 str::FromStr,
14 sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::{
19 api::core::v1::{Affinity, ResourceRequirements, Toleration},
20 apimachinery::pkg::apis::meta::v1::{Condition, Time},
21};
22use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
23use serde::Deserialize;
24use tracing::{debug, trace};
25use uuid::Uuid;
26
27use crate::metrics::Metrics;
28use mz_cloud_provider::CloudProvider;
29use mz_cloud_resources::crd::materialize::v1alpha1::{
30 Materialize, MaterializeCertSpec, MaterializeStatus,
31};
32use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
33use mz_orchestrator_tracing::TracingCliArgs;
34use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
35
36pub mod balancer;
37pub mod console;
38pub mod environmentd;
39pub mod tls;
40
41#[derive(clap::Parser)]
42pub struct MaterializeControllerArgs {
43 #[clap(long)]
44 cloud_provider: CloudProvider,
45 #[clap(long)]
46 region: String,
47 #[clap(long)]
48 create_balancers: bool,
49 #[clap(long)]
50 create_console: bool,
51 #[clap(long)]
52 helm_chart_version: Option<String>,
53 #[clap(long, default_value = "kubernetes")]
54 secrets_controller: String,
55 #[clap(long)]
56 collect_pod_metrics: bool,
57 #[clap(long)]
58 enable_prometheus_scrape_annotations: bool,
59 #[clap(long)]
60 disable_authentication: bool,
61
62 #[clap(long)]
63 segment_api_key: Option<String>,
64 #[clap(long)]
65 segment_client_side: bool,
66
67 #[clap(long)]
68 console_image_tag_default: String,
69 #[clap(long)]
70 console_image_tag_map: Vec<KeyValueArg<String, String>>,
71
72 #[clap(flatten)]
73 aws_info: AwsInfo,
74
75 #[clap(long)]
76 ephemeral_volume_class: Option<String>,
77 #[clap(long)]
78 scheduler_name: Option<String>,
79 #[clap(long)]
80 enable_security_context: bool,
81 #[clap(long)]
82 enable_internal_statement_logging: bool,
83 #[clap(long, default_value = "false")]
84 disable_statement_logging: bool,
85
86 #[clap(long)]
87 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
88 #[clap(long)]
89 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
90 #[clap(long, value_parser = parse_affinity)]
91 environmentd_affinity: Option<Affinity>,
92 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
93 environmentd_tolerations: Option<Vec<Toleration>>,
94 #[clap(long, value_parser = parse_resources)]
95 environmentd_default_resources: Option<ResourceRequirements>,
96 #[clap(long)]
97 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
98 #[clap(long, value_parser = parse_affinity)]
99 clusterd_affinity: Option<Affinity>,
100 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
101 clusterd_tolerations: Option<Vec<Toleration>>,
102 #[clap(long)]
103 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
104 #[clap(long, value_parser = parse_affinity)]
105 balancerd_affinity: Option<Affinity>,
106 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
107 balancerd_tolerations: Option<Vec<Toleration>>,
108 #[clap(long, value_parser = parse_resources)]
109 balancerd_default_resources: Option<ResourceRequirements>,
110 #[clap(long)]
111 console_node_selector: Vec<KeyValueArg<String, String>>,
112 #[clap(long, value_parser = parse_affinity)]
113 console_affinity: Option<Affinity>,
114 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
115 console_tolerations: Option<Vec<Toleration>>,
116 #[clap(long, value_parser = parse_resources)]
117 console_default_resources: Option<ResourceRequirements>,
118 #[clap(long, default_value = "always", value_enum)]
119 image_pull_policy: KubernetesImagePullPolicy,
120 #[clap(flatten)]
121 network_policies: NetworkPolicyConfig,
122
123 #[clap(long)]
124 environmentd_cluster_replica_sizes: Option<String>,
125 #[clap(long)]
126 bootstrap_default_cluster_replica_size: Option<String>,
127 #[clap(long)]
128 bootstrap_builtin_system_cluster_replica_size: Option<String>,
129 #[clap(long)]
130 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
131 #[clap(long)]
132 bootstrap_builtin_support_cluster_replica_size: Option<String>,
133 #[clap(long)]
134 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
135 #[clap(long)]
136 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
137 #[clap(long)]
138 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
139 #[clap(long)]
140 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
141 #[clap(long)]
142 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
143 #[clap(long)]
144 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
145
146 #[clap(
147 long,
148 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
149 )]
150 environmentd_allowed_origins: Vec<HeaderValue>,
151 #[clap(long, default_value = "https://console.materialize.com")]
152 internal_console_proxy_url: String,
153
154 #[clap(long, default_value = "6875")]
155 environmentd_sql_port: u16,
156 #[clap(long, default_value = "6876")]
157 environmentd_http_port: u16,
158 #[clap(long, default_value = "6877")]
159 environmentd_internal_sql_port: u16,
160 #[clap(long, default_value = "6878")]
161 environmentd_internal_http_port: u16,
162 #[clap(long, default_value = "6879")]
163 environmentd_internal_persist_pubsub_port: u16,
164
165 #[clap(long, default_value = "6875")]
166 balancerd_sql_port: u16,
167 #[clap(long, default_value = "6876")]
168 balancerd_http_port: u16,
169 #[clap(long, default_value = "8080")]
170 balancerd_internal_http_port: u16,
171
172 #[clap(long, default_value = "8080")]
173 console_http_port: u16,
174
175 #[clap(long, default_value = "{}")]
176 default_certificate_specs: DefaultCertificateSpecs,
177
178 #[clap(long, hide = true)]
179 disable_license_key_checks: bool,
180}
181
182fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
183 Ok(serde_json::from_str(s)?)
184}
185
186fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
187 Ok(serde_json::from_str(s)?)
188}
189
190fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
191 Ok(serde_json::from_str(s)?)
192}
193
194#[derive(Clone, Deserialize, Default)]
195#[serde(rename_all = "camelCase")]
196pub struct DefaultCertificateSpecs {
197 balancerd_external: Option<MaterializeCertSpec>,
198 console_external: Option<MaterializeCertSpec>,
199 internal: Option<MaterializeCertSpec>,
200}
201
202impl FromStr for DefaultCertificateSpecs {
203 type Err = serde_json::Error;
204
205 fn from_str(s: &str) -> Result<Self, Self::Err> {
206 serde_json::from_str(s)
207 }
208}
209
210#[derive(clap::Parser)]
211pub struct AwsInfo {
212 #[clap(long)]
213 aws_account_id: Option<String>,
214 #[clap(long)]
215 environmentd_iam_role_arn: Option<String>,
216 #[clap(long)]
217 environmentd_connection_role_arn: Option<String>,
218 #[clap(long)]
219 aws_secrets_controller_tags: Vec<String>,
220 #[clap(long)]
221 environmentd_availability_zones: Option<Vec<String>>,
222}
223
224#[derive(clap::Parser)]
225pub struct NetworkPolicyConfig {
226 #[clap(long = "network-policies-internal-enabled")]
227 internal_enabled: bool,
228
229 #[clap(long = "network-policies-ingress-enabled")]
230 ingress_enabled: bool,
231
232 #[clap(long = "network-policies-ingress-cidrs")]
233 ingress_cidrs: Vec<String>,
234
235 #[clap(long = "network-policies-egress-enabled")]
236 egress_enabled: bool,
237
238 #[clap(long = "network-policies-egress-cidrs")]
239 egress_cidrs: Vec<String>,
240}
241
242#[derive(Debug, thiserror::Error)]
243pub enum Error {
244 Anyhow(#[from] anyhow::Error),
245 Kube(#[from] kube::Error),
246}
247
248impl Display for Error {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 match self {
251 Self::Anyhow(e) => write!(f, "{e}"),
252 Self::Kube(e) => write!(f, "{e}"),
253 }
254 }
255}
256
257pub struct Context {
258 config: MaterializeControllerArgs,
259 tracing: TracingCliArgs,
260 orchestratord_namespace: String,
261 metrics: Arc<Metrics>,
262 needs_update: Arc<Mutex<BTreeSet<String>>>,
263}
264
265impl Context {
266 pub fn new(
267 config: MaterializeControllerArgs,
268 tracing: TracingCliArgs,
269 orchestratord_namespace: String,
270 metrics: Arc<Metrics>,
271 ) -> Self {
272 if config.cloud_provider == CloudProvider::Aws {
273 assert!(
274 config.aws_info.aws_account_id.is_some(),
275 "--aws-account-id is required when using --cloud-provider=aws"
276 );
277 }
278
279 Self {
280 config,
281 tracing,
282 orchestratord_namespace,
283 metrics,
284 needs_update: Default::default(),
285 }
286 }
287
288 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
289 let mut needs_update_set = self.needs_update.lock().unwrap();
290 if needs_update {
291 needs_update_set.insert(mz.name_unchecked());
292 } else {
293 needs_update_set.remove(&mz.name_unchecked());
294 }
295 self.metrics
296 .environmentd_needs_update
297 .set(u64::cast_from(needs_update_set.len()));
298 }
299
300 async fn update_status(
301 &self,
302 mz_api: &Api<Materialize>,
303 mz: &Materialize,
304 status: MaterializeStatus,
305 needs_update: bool,
306 ) -> Result<Materialize, kube::Error> {
307 self.set_needs_update(mz, needs_update);
308
309 let mut new_mz = mz.clone();
310 if !mz
311 .status
312 .as_ref()
313 .map_or(true, |mz_status| mz_status.needs_update(&status))
314 {
315 return Ok(new_mz);
316 }
317
318 new_mz.status = Some(status);
319 mz_api
320 .replace_status(
321 &mz.name_unchecked(),
322 &PostParams::default(),
323 serde_json::to_vec(&new_mz).unwrap(),
324 )
325 .await
326 }
327}
328
329#[async_trait::async_trait]
330impl k8s_controller::Context for Context {
331 type Resource = Materialize;
332 type Error = Error;
333
334 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
335
336 #[instrument(fields(organization_name=mz.name_unchecked()))]
337 async fn apply(
338 &self,
339 client: Client,
340 mz: &Self::Resource,
341 ) -> Result<Option<Action>, Self::Error> {
342 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
343
344 let status = mz.status();
345 if mz.status.is_none() {
346 self.update_status(&mz_api, mz, status, true).await?;
347 return Ok(None);
350 }
351
352 if mz.spec.request_rollout.is_nil() || mz.spec.environment_id.is_nil() {
353 let mut mz = mz.clone();
354 if mz.spec.request_rollout.is_nil() {
355 mz.spec.request_rollout = Uuid::new_v4();
356 }
357 if mz.spec.environment_id.is_nil() {
358 mz.spec.environment_id = Uuid::new_v4();
359 }
360 mz_api
361 .replace(&mz.name_unchecked(), &PostParams::default(), &mz)
362 .await?;
363 return Ok(None);
367 }
368
369 let active_resources = environmentd::Resources::new(
375 &self.config,
376 &self.tracing,
377 &self.orchestratord_namespace,
378 mz,
379 status.active_generation,
380 );
381 let has_current_changes = status.resources_hash != active_resources.generate_hash();
382 let active_generation = status.active_generation;
383 let next_generation = active_generation + 1;
384 let increment_generation = has_current_changes && !mz.in_place_rollout();
385 let desired_generation = if increment_generation {
386 next_generation
387 } else {
388 active_generation
389 };
390
391 let resources = environmentd::Resources::new(
394 &self.config,
395 &self.tracing,
396 &self.orchestratord_namespace,
397 mz,
398 desired_generation,
399 );
400 let resources_hash = resources.generate_hash();
401
402 let mut result = if has_current_changes {
403 if mz.rollout_requested() {
404 let mz = self
416 .update_status(
417 &mz_api,
418 mz,
419 MaterializeStatus {
420 active_generation,
421 last_completed_rollout_request: status.last_completed_rollout_request,
426 resource_id: status.resource_id,
427 resources_hash: String::new(),
428 conditions: vec![Condition {
429 type_: "UpToDate".into(),
430 status: "Unknown".into(),
431 last_transition_time: Time(chrono::offset::Utc::now()),
432 message: format!(
433 "Applying changes for generation {desired_generation}"
434 ),
435 observed_generation: mz.meta().generation,
436 reason: "Applying".into(),
437 }],
438 },
439 active_generation != desired_generation,
440 )
441 .await?;
442 let mz = &mz;
443 let status = mz.status();
444
445 trace!("applying environment resources");
446 match resources
447 .apply(
448 &client,
449 increment_generation,
450 mz.should_force_promote(),
451 &mz.namespace(),
452 )
453 .await
454 {
455 Ok(Some(action)) => {
456 trace!("new environment is not yet ready");
457 Ok(Some(action))
458 }
459 Ok(None) => {
460 resources.promote_services(&client, &mz.namespace()).await?;
464 if increment_generation {
465 resources
466 .teardown_generation(&client, mz, active_generation)
467 .await?;
468 }
469 self.update_status(
470 &mz_api,
471 mz,
472 MaterializeStatus {
473 active_generation: desired_generation,
474 last_completed_rollout_request: mz.requested_reconciliation_id(),
475 resource_id: status.resource_id,
476 resources_hash,
477 conditions: vec![Condition {
478 type_: "UpToDate".into(),
479 status: "True".into(),
480 last_transition_time: Time(chrono::offset::Utc::now()),
481 message: format!(
482 "Successfully applied changes for generation {desired_generation}"
483 ),
484 observed_generation: mz.meta().generation,
485 reason: "Applied".into(),
486 }],
487 },
488 false,
489 )
490 .await?;
491 Ok(None)
492 }
493 Err(e) => {
494 resources
500 .teardown_generation(&client, mz, next_generation)
501 .await?;
502 self.update_status(
503 &mz_api,
504 mz,
505 MaterializeStatus {
506 active_generation,
507 last_completed_rollout_request: status.last_completed_rollout_request,
512 resource_id: status.resource_id,
513 resources_hash: status.resources_hash,
514 conditions: vec![Condition {
515 type_: "UpToDate".into(),
516 status: "False".into(),
517 last_transition_time: Time(chrono::offset::Utc::now()),
518 message: format!(
519 "Failed to apply changes for generation {desired_generation}: {e}"
520 ),
521 observed_generation: mz.meta().generation,
522 reason: "FailedDeploy".into(),
523 }],
524 },
525 active_generation != desired_generation,
526 )
527 .await?;
528 Err(e)
529 }
530 }
531 } else {
532 let mut needs_update = mz.conditions_need_update();
533 if mz.update_in_progress() {
534 resources
535 .teardown_generation(&client, mz, next_generation)
536 .await?;
537 needs_update = true;
538 }
539 if needs_update {
540 self.update_status(
541 &mz_api,
542 mz,
543 MaterializeStatus {
544 active_generation,
545 last_completed_rollout_request: mz.requested_reconciliation_id(),
546 resource_id: status.resource_id,
547 resources_hash: status.resources_hash,
548 conditions: vec![Condition {
549 type_: "UpToDate".into(),
550 status: "False".into(),
551 last_transition_time: Time(chrono::offset::Utc::now()),
552 message: format!(
553 "Changes detected, waiting for approval for generation {desired_generation}"
554 ),
555 observed_generation: mz.meta().generation,
556 reason: "WaitingForApproval".into(),
557 }],
558 },
559 active_generation != desired_generation,
560 )
561 .await?;
562 }
563 debug!("changes detected, waiting for approval");
564 Ok(None)
565 }
566 } else {
567 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
572 if mz.update_in_progress() {
573 resources
574 .teardown_generation(&client, mz, next_generation)
575 .await?;
576 needs_update = true;
577 }
578 if needs_update {
579 self.update_status(
580 &mz_api,
581 mz,
582 MaterializeStatus {
583 active_generation,
584 last_completed_rollout_request: mz.requested_reconciliation_id(),
585 resource_id: status.resource_id,
586 resources_hash: status.resources_hash,
587 conditions: vec![Condition {
588 type_: "UpToDate".into(),
589 status: "True".into(),
590 last_transition_time: Time(chrono::offset::Utc::now()),
591 message: format!(
592 "No changes found from generation {active_generation}"
593 ),
594 observed_generation: mz.meta().generation,
595 reason: "Applied".into(),
596 }],
597 },
598 active_generation != desired_generation,
599 )
600 .await?;
601 }
602 debug!("no changes");
603 Ok(None)
604 };
605
606 if !matches!(result, Ok(None)) {
611 return result.map_err(Error::Anyhow);
612 }
613
614 let balancer = balancer::Resources::new(&self.config, mz);
615 if self.config.create_balancers {
616 result = balancer.apply(&client, &mz.namespace()).await;
617 } else {
618 result = balancer.cleanup(&client, &mz.namespace()).await;
619 }
620
621 if !matches!(result, Ok(None)) {
626 return result.map_err(Error::Anyhow);
627 }
628
629 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
630 else {
631 return Err(Error::Anyhow(anyhow::anyhow!(
632 "failed to parse environmentd image ref: {}",
633 mz.spec.environmentd_image_ref
634 )));
635 };
636 let console_image_tag = self
637 .config
638 .console_image_tag_map
639 .iter()
640 .find(|kv| kv.key == environmentd_image_tag)
641 .map(|kv| kv.value.clone())
642 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
643 let console = console::Resources::new(
644 &self.config,
645 mz,
646 &matching_image_from_environmentd_image_ref(
647 &mz.spec.environmentd_image_ref,
648 "console",
649 Some(&console_image_tag),
650 ),
651 );
652 if self.config.create_console {
653 console.apply(&client, &mz.namespace()).await?;
654 } else {
655 console.cleanup(&client, &mz.namespace()).await?;
656 }
657
658 result.map_err(Error::Anyhow)
659 }
660
661 #[instrument(fields(organization_name=mz.name_unchecked()))]
662 async fn cleanup(
663 &self,
664 _client: Client,
665 mz: &Self::Resource,
666 ) -> Result<Option<Action>, Self::Error> {
667 self.set_needs_update(mz, false);
668
669 Ok(None)
670 }
671}
672
673fn matching_image_from_environmentd_image_ref(
674 environmentd_image_ref: &str,
675 image_name: &str,
676 image_tag: Option<&str>,
677) -> String {
678 let namespace = environmentd_image_ref
679 .rsplit_once('/')
680 .unwrap_or(("materialize", ""))
681 .0;
682 let tag = image_tag.unwrap_or_else(|| {
683 environmentd_image_ref
684 .rsplit_once(':')
685 .unwrap_or(("", "unstable"))
686 .1
687 });
688 format!("{namespace}/{image_name}:{tag}")
689}