1use std::{
11 collections::BTreeSet,
12 fmt::Display,
13 str::FromStr,
14 sync::{Arc, Mutex},
15};
16
17use http::HeaderValue;
18use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
19use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
20use serde::Deserialize;
21use tracing::{debug, trace};
22
23use crate::metrics::Metrics;
24use mz_cloud_provider::CloudProvider;
25use mz_cloud_resources::crd::materialize::v1alpha1::{
26 Materialize, MaterializeCertSpec, MaterializeStatus,
27};
28use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
29use mz_orchestrator_tracing::TracingCliArgs;
30use mz_ore::{cast::CastFrom, cli::KeyValueArg, instrument};
31
32pub mod balancer;
33pub mod console;
34pub mod environmentd;
35pub mod tls;
36
37#[derive(clap::Parser)]
38pub struct MaterializeControllerArgs {
39 #[clap(long)]
40 cloud_provider: CloudProvider,
41 #[clap(long)]
42 region: String,
43 #[clap(long)]
44 create_balancers: bool,
45 #[clap(long)]
46 create_console: bool,
47 #[clap(long)]
48 helm_chart_version: Option<String>,
49 #[clap(long, default_value = "kubernetes")]
50 secrets_controller: String,
51 #[clap(long)]
52 collect_pod_metrics: bool,
53 #[clap(long)]
54 enable_prometheus_scrape_annotations: bool,
55 #[clap(long)]
56 disable_authentication: bool,
57
58 #[clap(long)]
59 segment_api_key: Option<String>,
60 #[clap(long)]
61 segment_client_side: bool,
62
63 #[clap(long)]
64 console_image_tag_default: String,
65 #[clap(long)]
66 console_image_tag_map: Vec<KeyValueArg<String, String>>,
67
68 #[clap(flatten)]
69 aws_info: AwsInfo,
70
71 #[clap(long)]
72 ephemeral_volume_class: Option<String>,
73 #[clap(long)]
74 scheduler_name: Option<String>,
75 #[clap(long)]
76 enable_security_context: bool,
77 #[clap(long)]
78 enable_internal_statement_logging: bool,
79 #[clap(long, default_value = "false")]
80 disable_statement_logging: bool,
81
82 #[clap(long)]
83 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
84 #[clap(long)]
85 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
86 #[clap(long)]
87 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
88 #[clap(long)]
89 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
90 #[clap(long)]
91 console_node_selector: Vec<KeyValueArg<String, String>>,
92 #[clap(long, default_value = "always", value_enum)]
93 image_pull_policy: KubernetesImagePullPolicy,
94 #[clap(flatten)]
95 network_policies: NetworkPolicyConfig,
96
97 #[clap(long)]
98 environmentd_cluster_replica_sizes: Option<String>,
99 #[clap(long)]
100 bootstrap_default_cluster_replica_size: Option<String>,
101 #[clap(long)]
102 bootstrap_builtin_system_cluster_replica_size: Option<String>,
103 #[clap(long)]
104 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
105 #[clap(long)]
106 bootstrap_builtin_support_cluster_replica_size: Option<String>,
107 #[clap(long)]
108 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
109 #[clap(long)]
110 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
111 #[clap(long)]
112 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
113 #[clap(long)]
114 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
115 #[clap(long)]
116 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
117 #[clap(long)]
118 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
119
120 #[clap(
121 long,
122 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
123 )]
124 environmentd_allowed_origins: Vec<HeaderValue>,
125 #[clap(long, default_value = "https://console.materialize.com")]
126 internal_console_proxy_url: String,
127
128 #[clap(long, default_value = "6875")]
129 environmentd_sql_port: i32,
130 #[clap(long, default_value = "6876")]
131 environmentd_http_port: i32,
132 #[clap(long, default_value = "6877")]
133 environmentd_internal_sql_port: i32,
134 #[clap(long, default_value = "6878")]
135 environmentd_internal_http_port: i32,
136 #[clap(long)]
137 environmentd_internal_http_host_override: Option<String>,
138 #[clap(long, default_value = "6879")]
139 environmentd_internal_persist_pubsub_port: i32,
140
141 #[clap(long, default_value = "6875")]
142 balancerd_sql_port: i32,
143 #[clap(long, default_value = "6876")]
144 balancerd_http_port: i32,
145 #[clap(long, default_value = "8080")]
146 balancerd_internal_http_port: i32,
147
148 #[clap(long, default_value = "8080")]
149 console_http_port: i32,
150
151 #[clap(long, default_value = "{}")]
152 default_certificate_specs: DefaultCertificateSpecs,
153
154 #[clap(long, hide = true)]
155 disable_license_key_checks: bool,
156}
157
158#[derive(Clone, Deserialize, Default)]
159#[serde(rename_all = "camelCase")]
160pub struct DefaultCertificateSpecs {
161 balancerd_external: Option<MaterializeCertSpec>,
162 console_external: Option<MaterializeCertSpec>,
163 internal: Option<MaterializeCertSpec>,
164}
165
166impl FromStr for DefaultCertificateSpecs {
167 type Err = serde_json::Error;
168
169 fn from_str(s: &str) -> Result<Self, Self::Err> {
170 serde_json::from_str(s)
171 }
172}
173
174#[derive(clap::Parser)]
175pub struct AwsInfo {
176 #[clap(long)]
177 aws_account_id: Option<String>,
178 #[clap(long)]
179 environmentd_iam_role_arn: Option<String>,
180 #[clap(long)]
181 environmentd_connection_role_arn: Option<String>,
182 #[clap(long)]
183 aws_secrets_controller_tags: Vec<String>,
184 #[clap(long)]
185 environmentd_availability_zones: Option<Vec<String>>,
186}
187
188#[derive(clap::Parser)]
189pub struct NetworkPolicyConfig {
190 #[clap(long = "network-policies-internal-enabled")]
191 internal_enabled: bool,
192
193 #[clap(long = "network-policies-ingress-enabled")]
194 ingress_enabled: bool,
195
196 #[clap(long = "network-policies-ingress-cidrs")]
197 ingress_cidrs: Vec<String>,
198
199 #[clap(long = "network-policies-egress-enabled")]
200 egress_enabled: bool,
201
202 #[clap(long = "network-policies-egress-cidrs")]
203 egress_cidrs: Vec<String>,
204}
205
206#[derive(Debug, thiserror::Error)]
207pub enum Error {
208 Anyhow(#[from] anyhow::Error),
209 Kube(#[from] kube::Error),
210}
211
212impl Display for Error {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 match self {
215 Self::Anyhow(e) => write!(f, "{e}"),
216 Self::Kube(e) => write!(f, "{e}"),
217 }
218 }
219}
220
221pub struct Context {
222 config: MaterializeControllerArgs,
223 tracing: TracingCliArgs,
224 orchestratord_namespace: String,
225 metrics: Arc<Metrics>,
226 needs_update: Arc<Mutex<BTreeSet<String>>>,
227}
228
229impl Context {
230 pub fn new(
231 config: MaterializeControllerArgs,
232 tracing: TracingCliArgs,
233 orchestratord_namespace: String,
234 metrics: Arc<Metrics>,
235 ) -> Self {
236 if config.cloud_provider == CloudProvider::Aws {
237 assert!(
238 config.aws_info.aws_account_id.is_some(),
239 "--aws-account-id is required when using --cloud-provider=aws"
240 );
241 assert!(
242 config.aws_info.environmentd_iam_role_arn.is_some(),
243 "--environmentd-iam-role-arn is required when using --cloud-provider=aws"
244 );
245 }
246
247 Self {
248 config,
249 tracing,
250 orchestratord_namespace,
251 metrics,
252 needs_update: Default::default(),
253 }
254 }
255
256 fn set_needs_update(&self, mz: &Materialize, needs_update: bool) {
257 let mut needs_update_set = self.needs_update.lock().unwrap();
258 if needs_update {
259 needs_update_set.insert(mz.name_unchecked());
260 } else {
261 needs_update_set.remove(&mz.name_unchecked());
262 }
263 self.metrics
264 .needs_update
265 .set(u64::cast_from(needs_update_set.len()));
266 }
267
268 async fn update_status(
269 &self,
270 mz_api: &Api<Materialize>,
271 mz: &Materialize,
272 status: MaterializeStatus,
273 needs_update: bool,
274 ) -> Result<Materialize, kube::Error> {
275 self.set_needs_update(mz, needs_update);
276
277 let mut new_mz = mz.clone();
278 if !mz
279 .status
280 .as_ref()
281 .map_or(true, |mz_status| mz_status.needs_update(&status))
282 {
283 return Ok(new_mz);
284 }
285
286 new_mz.status = Some(status);
287 mz_api
288 .replace_status(
289 &mz.name_unchecked(),
290 &PostParams::default(),
291 serde_json::to_vec(&new_mz).unwrap(),
292 )
293 .await
294 }
295}
296
297#[async_trait::async_trait]
298impl k8s_controller::Context for Context {
299 type Resource = Materialize;
300 type Error = Error;
301
302 const FINALIZER_NAME: &'static str = "orchestratord.materialize.cloud/materialize";
303
304 #[instrument(fields(organization_name=mz.name_unchecked()))]
305 async fn apply(
306 &self,
307 client: Client,
308 mz: &Self::Resource,
309 ) -> Result<Option<Action>, Self::Error> {
310 let mz_api: Api<Materialize> = Api::namespaced(client.clone(), &mz.namespace());
311
312 let status = mz.status();
313 if mz.status.is_none() {
314 self.update_status(&mz_api, mz, status, true).await?;
315 return Ok(None);
318 }
319
320 let active_resources = environmentd::Resources::new(
326 &self.config,
327 &self.tracing,
328 &self.orchestratord_namespace,
329 mz,
330 status.active_generation,
331 );
332 let has_current_changes = status.resources_hash != active_resources.generate_hash();
333 let active_generation = status.active_generation;
334 let next_generation = active_generation + 1;
335 let increment_generation = has_current_changes && !mz.in_place_rollout();
336 let desired_generation = if increment_generation {
337 next_generation
338 } else {
339 active_generation
340 };
341
342 let resources = environmentd::Resources::new(
345 &self.config,
346 &self.tracing,
347 &self.orchestratord_namespace,
348 mz,
349 desired_generation,
350 );
351 let resources_hash = resources.generate_hash();
352
353 let mut result = if has_current_changes {
354 if mz.rollout_requested() {
355 let mz = self
367 .update_status(
368 &mz_api,
369 mz,
370 MaterializeStatus {
371 active_generation,
372 last_completed_rollout_request: status.last_completed_rollout_request,
377 resource_id: status.resource_id,
378 resources_hash: String::new(),
379 conditions: vec![Condition {
380 type_: "UpToDate".into(),
381 status: "Unknown".into(),
382 last_transition_time: Time(chrono::offset::Utc::now()),
383 message: format!(
384 "Applying changes for generation {desired_generation}"
385 ),
386 observed_generation: mz.meta().generation,
387 reason: "Applying".into(),
388 }],
389 },
390 active_generation != desired_generation,
391 )
392 .await?;
393 let mz = &mz;
394 let status = mz.status();
395
396 trace!("applying environment resources");
397 match resources
398 .apply(
399 &client,
400 &self.config,
401 increment_generation,
402 mz.should_force_promote(),
403 &mz.namespace(),
404 )
405 .await
406 {
407 Ok(Some(action)) => {
408 trace!("new environment is not yet ready");
409 Ok(Some(action))
410 }
411 Ok(None) => {
412 resources.promote_services(&client, &mz.namespace()).await?;
416 if increment_generation {
417 resources
418 .teardown_generation(&client, mz, active_generation)
419 .await?;
420 }
421 self.update_status(
422 &mz_api,
423 mz,
424 MaterializeStatus {
425 active_generation: desired_generation,
426 last_completed_rollout_request: mz.requested_reconciliation_id(),
427 resource_id: status.resource_id,
428 resources_hash,
429 conditions: vec![Condition {
430 type_: "UpToDate".into(),
431 status: "True".into(),
432 last_transition_time: Time(chrono::offset::Utc::now()),
433 message: format!(
434 "Successfully applied changes for generation {desired_generation}"
435 ),
436 observed_generation: mz.meta().generation,
437 reason: "Applied".into(),
438 }],
439 },
440 false,
441 )
442 .await?;
443 Ok(None)
444 }
445 Err(e) => {
446 resources
447 .teardown_generation(&client, mz, next_generation)
448 .await?;
449 self.update_status(
450 &mz_api,
451 mz,
452 MaterializeStatus {
453 active_generation,
454 last_completed_rollout_request: status.last_completed_rollout_request,
459 resource_id: status.resource_id,
460 resources_hash: status.resources_hash,
461 conditions: vec![Condition {
462 type_: "UpToDate".into(),
463 status: "False".into(),
464 last_transition_time: Time(chrono::offset::Utc::now()),
465 message: format!(
466 "Failed to apply changes for generation {desired_generation}: {e}"
467 ),
468 observed_generation: mz.meta().generation,
469 reason: "FailedDeploy".into(),
470 }],
471 },
472 active_generation != desired_generation,
473 )
474 .await?;
475 Err(e)
476 }
477 }
478 } else {
479 let mut needs_update = mz.conditions_need_update();
480 if mz.update_in_progress() {
481 resources
482 .teardown_generation(&client, mz, next_generation)
483 .await?;
484 needs_update = true;
485 }
486 if needs_update {
487 self.update_status(
488 &mz_api,
489 mz,
490 MaterializeStatus {
491 active_generation,
492 last_completed_rollout_request: mz.requested_reconciliation_id(),
493 resource_id: status.resource_id,
494 resources_hash: status.resources_hash,
495 conditions: vec![Condition {
496 type_: "UpToDate".into(),
497 status: "False".into(),
498 last_transition_time: Time(chrono::offset::Utc::now()),
499 message: format!(
500 "Changes detected, waiting for approval for generation {desired_generation}"
501 ),
502 observed_generation: mz.meta().generation,
503 reason: "WaitingForApproval".into(),
504 }],
505 },
506 active_generation != desired_generation,
507 )
508 .await?;
509 }
510 debug!("changes detected, waiting for approval");
511 Ok(None)
512 }
513 } else {
514 let mut needs_update = mz.conditions_need_update() || mz.rollout_requested();
519 if mz.update_in_progress() {
520 resources
521 .teardown_generation(&client, mz, next_generation)
522 .await?;
523 needs_update = true;
524 }
525 if needs_update {
526 self.update_status(
527 &mz_api,
528 mz,
529 MaterializeStatus {
530 active_generation,
531 last_completed_rollout_request: mz.requested_reconciliation_id(),
532 resource_id: status.resource_id,
533 resources_hash: status.resources_hash,
534 conditions: vec![Condition {
535 type_: "UpToDate".into(),
536 status: "True".into(),
537 last_transition_time: Time(chrono::offset::Utc::now()),
538 message: format!(
539 "No changes found from generation {active_generation}"
540 ),
541 observed_generation: mz.meta().generation,
542 reason: "Applied".into(),
543 }],
544 },
545 active_generation != desired_generation,
546 )
547 .await?;
548 }
549 debug!("no changes");
550 Ok(None)
551 };
552
553 if !matches!(result, Ok(None)) {
558 return result.map_err(Error::Anyhow);
559 }
560
561 if self.config.create_balancers {
562 result = balancer::Resources::new(&self.config, mz)
563 .apply(&client, &mz.namespace())
564 .await;
565 }
566
567 if !matches!(result, Ok(None)) {
572 return result.map_err(Error::Anyhow);
573 }
574
575 if self.config.create_console {
576 let Some((_, environmentd_image_tag)) = mz.spec.environmentd_image_ref.rsplit_once(':')
577 else {
578 return Err(Error::Anyhow(anyhow::anyhow!(
579 "failed to parse environmentd image ref: {}",
580 mz.spec.environmentd_image_ref
581 )));
582 };
583 let console_image_tag = self
584 .config
585 .console_image_tag_map
586 .iter()
587 .find(|kv| kv.key == environmentd_image_tag)
588 .map(|kv| kv.value.clone())
589 .unwrap_or_else(|| self.config.console_image_tag_default.clone());
590 console::Resources::new(
591 &self.config,
592 mz,
593 &matching_image_from_environmentd_image_ref(
594 &mz.spec.environmentd_image_ref,
595 "console",
596 Some(&console_image_tag),
597 ),
598 )
599 .apply(&client, &mz.namespace())
600 .await?;
601 }
602
603 result.map_err(Error::Anyhow)
604 }
605
606 #[instrument(fields(organization_name=mz.name_unchecked()))]
607 async fn cleanup(
608 &self,
609 _client: Client,
610 mz: &Self::Resource,
611 ) -> Result<Option<Action>, Self::Error> {
612 self.set_needs_update(mz, false);
613
614 Ok(None)
615 }
616}
617
618fn matching_image_from_environmentd_image_ref(
619 environmentd_image_ref: &str,
620 image_name: &str,
621 image_tag: Option<&str>,
622) -> String {
623 let namespace = environmentd_image_ref
624 .rsplit_once('/')
625 .unwrap_or(("materialize", ""))
626 .0;
627 let tag = image_tag.unwrap_or_else(|| {
628 environmentd_image_ref
629 .rsplit_once(':')
630 .unwrap_or(("", "unstable"))
631 .1
632 });
633 format!("{namespace}/{image_name}:{tag}")
634}