1use k8s_controller::TraceMetadata;
11use k8s_openapi::{
12 api::{
13 apps::v1::{Deployment, DeploymentSpec},
14 core::v1::{
15 Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
16 EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
17 ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18 ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
19 },
20 networking::v1::{
21 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
22 NetworkPolicySpec,
23 },
24 },
25 apimachinery::pkg::{
26 apis::meta::v1::{Condition, LabelSelector, Time},
27 util::intstr::IntOrString,
28 },
29 jiff::Timestamp,
30};
31use kube::{
32 Api, Client, Resource, ResourceExt,
33 api::{DeleteParams, ObjectMeta, PostParams},
34 runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
35};
36use maplit::btreemap;
37use serde::Serialize;
38use tracing::{trace, warn};
39
40use crate::{
41 Error,
42 k8s::{apply_resource, get_resource, replace_resource},
43 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
44};
45use mz_cloud_resources::crd::{
46 ManagedResource,
47 console::v1alpha1::{Console, HttpConnectionScheme},
48 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
49};
50use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
51use mz_ore::{cli::KeyValueArg, instrument};
52use mz_server_core::listeners::AuthenticatorKind;
53
54pub struct Config {
55 pub enable_security_context: bool,
56 pub enable_prometheus_scrape_annotations: bool,
57
58 pub image_pull_policy: KubernetesImagePullPolicy,
59 pub scheduler_name: Option<String>,
60 pub console_node_selector: Vec<KeyValueArg<String, String>>,
61 pub console_affinity: Option<Affinity>,
62 pub console_tolerations: Option<Vec<Toleration>>,
63 pub console_default_resources: Option<ResourceRequirements>,
64 pub network_policies_ingress_enabled: bool,
65 pub network_policies_ingress_cidrs: Vec<String>,
66
67 pub default_certificate_specs: DefaultCertificateSpecs,
68
69 pub console_http_port: u16,
70 pub balancerd_http_port: u16,
71}
72
73#[derive(Serialize)]
74struct AppConfig {
75 version: String,
76 auth: AppConfigAuth,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 balancerd_dns_names: Option<Vec<String>>,
79}
80
81#[derive(Serialize)]
82struct AppConfigAuth {
83 mode: AuthenticatorKind,
84}
85
86pub struct Context {
87 config: Config,
88}
89
90impl Context {
91 pub fn new(config: Config) -> Self {
92 Self { config }
93 }
94
95 async fn sync_deployment_status(
96 &self,
97 client: &Client,
98 console: &Console,
99 ) -> Result<(), Error> {
100 let namespace = console.namespace();
101 let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
102 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
103
104 let Some(deployment) = get_resource(&deployment_api, &console.deployment_name()).await?
105 else {
106 return Ok(());
107 };
108
109 let Some(deployment_conditions) = &deployment
110 .status
111 .as_ref()
112 .and_then(|status| status.conditions.as_ref())
113 else {
114 return Ok(());
117 };
118
119 let ready = deployment_conditions
120 .iter()
121 .any(|condition| condition.type_ == "Available" && condition.status == "True");
122 let ready_str = if ready { "True" } else { "False" };
123
124 let mut status = console.status.clone().unwrap();
125 if status
126 .conditions
127 .iter()
128 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
129 {
130 return Ok(());
134 }
135
136 status.conditions = vec![Condition {
137 type_: "Ready".to_string(),
138 status: ready_str.to_string(),
139 last_transition_time: Time(Timestamp::now()),
140 message: format!(
141 "console deployment is{} ready",
142 if ready { "" } else { " not" }
143 ),
144 observed_generation: None,
145 reason: "DeploymentStatus".to_string(),
146 }];
147 let mut new_console = console.clone();
148 new_console.status = Some(status);
149
150 console_api
151 .replace_status(
152 &console.name_unchecked(),
153 &PostParams::default(),
154 &new_console,
155 )
156 .await?;
157
158 Ok(())
159 }
160
161 fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
162 let mut network_policies = Vec::new();
163 if self.config.network_policies_ingress_enabled {
164 let console_label_selector = LabelSelector {
165 match_labels: Some(
166 console
167 .default_labels()
168 .into_iter()
169 .chain([("materialize.cloud/app".to_owned(), console.app_name())])
170 .collect(),
171 ),
172 ..Default::default()
173 };
174 network_policies.extend([NetworkPolicy {
175 metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
176 spec: Some(NetworkPolicySpec {
177 ingress: Some(vec![NetworkPolicyIngressRule {
178 from: Some(
179 self.config
180 .network_policies_ingress_cidrs
181 .iter()
182 .map(|cidr| NetworkPolicyPeer {
183 ip_block: Some(IPBlock {
184 cidr: cidr.to_owned(),
185 except: None,
186 }),
187 ..Default::default()
188 })
189 .collect(),
190 ),
191 ports: Some(vec![NetworkPolicyPort {
192 port: Some(IntOrString::Int(self.config.console_http_port.into())),
193 protocol: Some("TCP".to_string()),
194 ..Default::default()
195 }]),
196 ..Default::default()
197 }]),
198 pod_selector: Some(console_label_selector),
199 policy_types: Some(vec!["Ingress".to_owned()]),
200 ..Default::default()
201 }),
202 }]);
203 }
204 network_policies
205 }
206
207 fn create_console_external_certificate(
208 &self,
209 console: &Console,
210 ) -> anyhow::Result<Option<Certificate>> {
211 create_certificate(
212 self.config
213 .default_certificate_specs
214 .console_external
215 .clone(),
216 console,
217 console.spec.external_certificate_spec.clone(),
218 console.external_certificate_name(),
219 console.external_certificate_secret_name(),
220 None,
221 CertificatePrivateKeyAlgorithm::Ecdsa,
222 Some(256),
223 )
224 }
225
226 fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
227 let balancerd_dns_names = console.spec.balancerd.dns_names.clone();
228 let version: String = console
229 .spec
230 .console_image_ref
231 .rsplitn(2, ':')
232 .next()
233 .expect("at least one chunk, even if empty")
234 .to_owned();
235 let app_config_json = serde_json::to_string(&AppConfig {
236 version,
237 balancerd_dns_names,
238 auth: AppConfigAuth {
239 mode: console.spec.authenticator_kind,
240 },
241 })
242 .expect("known valid");
243 ConfigMap {
244 binary_data: None,
245 data: Some(btreemap! {
246 "app-config.json".to_owned() => app_config_json,
247 }),
248 immutable: None,
249 metadata: console.managed_resource_meta(console.configmap_name()),
250 }
251 }
252
253 fn create_console_deployment_object(&self, console: &Console) -> Deployment {
254 let mut pod_template_labels = console.default_labels();
255 pod_template_labels.insert(
256 "materialize.cloud/name".to_owned(),
257 console.deployment_name(),
258 );
259 pod_template_labels.insert("app".to_owned(), "console".to_string());
260 pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
261
262 let ports = vec![ContainerPort {
263 container_port: self.config.console_http_port.into(),
264 name: Some("http".into()),
265 protocol: Some("TCP".into()),
266 ..Default::default()
267 }];
268
269 let scheme = match console.spec.balancerd.scheme {
270 HttpConnectionScheme::Http => "http",
271 HttpConnectionScheme::Https => "https",
272 };
273 let mut env = vec![EnvVar {
274 name: "MZ_ENDPOINT".to_string(),
275 value: Some(format!(
276 "{}://{}.{}.svc.cluster.local:{}",
277 scheme,
278 console.spec.balancerd.service_name,
279 console.spec.balancerd.namespace,
280 self.config.balancerd_http_port,
281 )),
282 ..Default::default()
283 }];
284 let mut volumes = vec![Volume {
285 name: "app-config".to_string(),
286 config_map: Some(ConfigMapVolumeSource {
287 name: console.configmap_name(),
288 default_mode: Some(256),
289 optional: Some(false),
290 items: Some(vec![KeyToPath {
291 key: "app-config.json".to_string(),
292 path: "app-config.json".to_string(),
293 ..Default::default()
294 }]),
295 }),
296 ..Default::default()
297 }];
298 let mut volume_mounts = vec![VolumeMount {
299 name: "app-config".to_string(),
300 mount_path: "/usr/share/nginx/html/app-config".to_string(),
301 ..Default::default()
302 }];
303
304 let scheme = if issuer_ref_defined(
305 &self.config.default_certificate_specs.console_external,
306 &console.spec.external_certificate_spec,
307 ) {
308 volumes.push(Volume {
309 name: "external-certificate".to_owned(),
310 secret: Some(SecretVolumeSource {
311 default_mode: Some(0o400),
312 secret_name: Some(console.external_certificate_secret_name()),
313 items: None,
314 optional: Some(false),
315 }),
316 ..Default::default()
317 });
318 volume_mounts.push(VolumeMount {
319 name: "external-certificate".to_owned(),
320 mount_path: "/nginx/tls".to_owned(),
321 read_only: Some(true),
322 ..Default::default()
323 });
324 env.push(EnvVar {
325 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
326 value: Some(format!(
327 "listen {} ssl;
328ssl_certificate /nginx/tls/tls.crt;
329ssl_certificate_key /nginx/tls/tls.key;",
330 self.config.console_http_port
331 )),
332 ..Default::default()
333 });
334 Some("HTTPS".to_owned())
335 } else {
336 env.push(EnvVar {
337 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
338 value: Some(format!("listen {};", self.config.console_http_port)),
339 ..Default::default()
340 });
341 Some("HTTP".to_owned())
342 };
343
344 let probe = Probe {
345 http_get: Some(HTTPGetAction {
346 path: Some("/".to_string()),
347 port: IntOrString::Int(self.config.console_http_port.into()),
348 scheme,
349 ..Default::default()
350 }),
351 ..Default::default()
352 };
353
354 let security_context = if self.config.enable_security_context {
355 Some(SecurityContext {
359 run_as_non_root: Some(true),
360 capabilities: Some(Capabilities {
361 drop: Some(vec!["ALL".to_string()]),
362 ..Default::default()
363 }),
364 seccomp_profile: Some(SeccompProfile {
365 type_: "RuntimeDefault".to_string(),
366 ..Default::default()
367 }),
368 allow_privilege_escalation: Some(false),
369 ..Default::default()
370 })
371 } else {
372 None
373 };
374
375 let container = Container {
376 name: "console".to_owned(),
377 image: Some(console.spec.console_image_ref.clone()),
378 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
379 ports: Some(ports),
380 env: Some(env),
381 startup_probe: Some(Probe {
382 period_seconds: Some(1),
383 failure_threshold: Some(10),
384 ..probe.clone()
385 }),
386 readiness_probe: Some(Probe {
387 period_seconds: Some(30),
388 failure_threshold: Some(1),
389 ..probe.clone()
390 }),
391 liveness_probe: Some(Probe {
392 period_seconds: Some(30),
393 ..probe.clone()
394 }),
395 resources: console
396 .spec
397 .resource_requirements
398 .clone()
399 .or_else(|| self.config.console_default_resources.clone()),
400 security_context,
401 volume_mounts: Some(volume_mounts),
402 ..Default::default()
403 };
404
405 let deployment_spec = DeploymentSpec {
406 replicas: Some(console.replicas()),
407 selector: LabelSelector {
408 match_labels: Some(pod_template_labels.clone()),
409 ..Default::default()
410 },
411 template: PodTemplateSpec {
412 metadata: Some(ObjectMeta {
415 labels: Some(pod_template_labels),
416 ..Default::default()
417 }),
418 spec: Some(PodSpec {
419 containers: vec![container],
420 node_selector: Some(
421 self.config
422 .console_node_selector
423 .iter()
424 .map(|selector| (selector.key.clone(), selector.value.clone()))
425 .collect(),
426 ),
427 affinity: self.config.console_affinity.clone(),
428 tolerations: self.config.console_tolerations.clone(),
429 scheduler_name: self.config.scheduler_name.clone(),
430 volumes: Some(volumes),
431 security_context: Some(PodSecurityContext {
432 fs_group: Some(101),
433 ..Default::default()
434 }),
435 ..Default::default()
436 }),
437 },
438 ..Default::default()
439 };
440
441 Deployment {
442 metadata: ObjectMeta {
443 ..console.managed_resource_meta(console.deployment_name())
444 },
445 spec: Some(deployment_spec),
446 status: None,
447 }
448 }
449
450 fn create_console_service_object(&self, console: &Console) -> Service {
451 let selector =
452 btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
453
454 let ports = vec![ServicePort {
455 name: Some("http".to_string()),
456 protocol: Some("TCP".to_string()),
457 port: self.config.console_http_port.into(),
458 target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
459 ..Default::default()
460 }];
461
462 let spec = ServiceSpec {
463 type_: Some("ClusterIP".to_string()),
464 cluster_ip: Some("None".to_string()),
465 selector: Some(selector),
466 ports: Some(ports),
467 ..Default::default()
468 };
469
470 Service {
471 metadata: console.managed_resource_meta(console.service_name()),
472 spec: Some(spec),
473 status: None,
474 }
475 }
476
477 async fn fix_deployment(
480 &self,
481 deployment_api: &Api<Deployment>,
482 new_deployment: &Deployment,
483 ) -> Result<(), Error> {
484 let Some(mut existing_deployment) =
485 get_resource(deployment_api, &new_deployment.name_unchecked()).await?
486 else {
487 return Ok(());
488 };
489
490 if existing_deployment.spec.as_ref().unwrap().selector
491 == new_deployment.spec.as_ref().unwrap().selector
492 {
493 return Ok(());
494 }
495
496 warn!("found existing deployment with old label selector, fixing");
497
498 existing_deployment
501 .spec
502 .as_mut()
503 .unwrap()
504 .template
505 .metadata
506 .as_mut()
507 .unwrap()
508 .labels = new_deployment
509 .spec
510 .as_ref()
511 .unwrap()
512 .template
513 .metadata
514 .as_ref()
515 .unwrap()
516 .labels
517 .clone();
518
519 replace_resource(deployment_api, &existing_deployment).await?;
523 await_condition(
524 deployment_api.clone(),
525 &existing_deployment.name_unchecked(),
526 |deployment: Option<&Deployment>| {
527 let observed_generation = deployment
528 .and_then(|deployment| deployment.status.as_ref())
529 .and_then(|status| status.observed_generation)
530 .unwrap_or(0);
531 let current_generation = deployment
532 .and_then(|deployment| deployment.meta().generation)
533 .unwrap_or(0);
534 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
535 observed_generation == current_generation
536 && current_generation > previous_generation
537 },
538 )
539 .await
540 .map_err(|e| anyhow::anyhow!(e))?;
541 await_condition(
542 deployment_api.clone(),
543 &existing_deployment.name_unchecked(),
544 is_deployment_completed(),
545 )
546 .await
547 .map_err(|e| anyhow::anyhow!(e))?;
548
549 match kube::runtime::wait::delete::delete_and_finalize(
552 deployment_api.clone(),
553 &existing_deployment.name_unchecked(),
554 &DeleteParams::orphan(),
555 )
556 .await
557 {
558 Ok(_) => {}
559 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
560 if e.code == 404 =>
561 {
562 }
564 Err(e) => return Err(anyhow::anyhow!(e).into()),
565 }
566
567 Ok(())
573 }
574}
575
576#[async_trait::async_trait]
577impl k8s_controller::Context for Context {
578 type Resource = Console;
579 type Error = Error;
580
581 #[instrument(fields())]
582 async fn apply(
583 &self,
584 client: Client,
585 console: &Self::Resource,
586 _metadata: &mut TraceMetadata,
587 ) -> Result<Option<Action>, Self::Error> {
588 if console.status.is_none() {
589 let console_api: Api<Console> =
590 Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
591 let mut new_console = console.clone();
592 new_console.status = Some(console.status());
593 console_api
594 .replace_status(
595 &console.name_unchecked(),
596 &PostParams::default(),
597 &new_console,
598 )
599 .await?;
600 return Ok(None);
603 }
604
605 let namespace = console.namespace();
606 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
607 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
608 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
609 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
610 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
611
612 trace!("creating new network policies");
613 let network_policies = self.create_network_policies(console);
614 for network_policy in &network_policies {
615 apply_resource(&network_policy_api, network_policy).await?;
616 }
617
618 trace!("creating new console configmap");
619 let console_configmap = self.create_console_app_configmap_object(console);
620 apply_resource(&configmap_api, &console_configmap).await?;
621
622 trace!("creating new console deployment");
623 let console_deployment = self.create_console_deployment_object(console);
624 self.fix_deployment(&deployment_api, &console_deployment)
625 .await?;
626 apply_resource(&deployment_api, &console_deployment).await?;
627
628 trace!("creating new console service");
629 let console_service = self.create_console_service_object(console);
630 apply_resource(&service_api, &console_service).await?;
631
632 let console_external_certificate = self.create_console_external_certificate(console)?;
633 if let Some(certificate) = &console_external_certificate {
634 trace!("creating new console external certificate");
635 apply_resource(&certificate_api, certificate).await?;
636 }
637
638 self.sync_deployment_status(&client, console).await?;
639
640 Ok(None)
641 }
642}