1use std::sync::Arc;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec},
15 core::v1::{
16 Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
17 EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
18 ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
19 ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
20 },
21 networking::v1::{
22 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
23 NetworkPolicySpec,
24 },
25 },
26 apimachinery::pkg::{
27 apis::meta::v1::{Condition, LabelSelector, Time},
28 util::intstr::IntOrString,
29 },
30};
31use kube::{
32 Api, Client, Resource, ResourceExt,
33 api::{DeleteParams, ObjectMeta, PostParams},
34 runtime::{
35 conditions::is_deployment_completed,
36 controller::Action,
37 reflector::{ObjectRef, Store},
38 wait::await_condition,
39 },
40};
41use maplit::btreemap;
42use serde::Serialize;
43use tracing::{trace, warn};
44
45use crate::{
46 Error,
47 k8s::{apply_resource, make_reflector, replace_resource},
48 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
49};
50use mz_cloud_resources::crd::{
51 ManagedResource,
52 console::v1alpha1::{Console, HttpConnectionScheme},
53 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
54};
55use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
56use mz_ore::{cli::KeyValueArg, instrument};
57use mz_server_core::listeners::AuthenticatorKind;
58
59pub struct Config {
60 pub enable_security_context: bool,
61 pub enable_prometheus_scrape_annotations: bool,
62
63 pub image_pull_policy: KubernetesImagePullPolicy,
64 pub scheduler_name: Option<String>,
65 pub console_node_selector: Vec<KeyValueArg<String, String>>,
66 pub console_affinity: Option<Affinity>,
67 pub console_tolerations: Option<Vec<Toleration>>,
68 pub console_default_resources: Option<ResourceRequirements>,
69 pub network_policies_ingress_enabled: bool,
70 pub network_policies_ingress_cidrs: Vec<String>,
71
72 pub default_certificate_specs: DefaultCertificateSpecs,
73
74 pub console_http_port: u16,
75 pub balancerd_http_port: u16,
76}
77
78#[derive(Serialize)]
79struct AppConfig {
80 version: String,
81 auth: AppConfigAuth,
82}
83
84#[derive(Serialize)]
85struct AppConfigAuth {
86 mode: AuthenticatorKind,
87}
88
89pub struct Context {
90 config: Config,
91 deployments: Store<Deployment>,
92}
93
94impl Context {
95 pub async fn new(config: Config, client: Client) -> Self {
96 Self {
97 config,
98 deployments: make_reflector(client).await,
99 }
100 }
101
102 async fn sync_deployment_status(
103 &self,
104 client: &Client,
105 console: &Console,
106 ) -> Result<(), kube::Error> {
107 let namespace = console.namespace();
108 let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
109
110 let Some(deployment) = self
111 .deployments
112 .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
113 else {
114 return Ok(());
115 };
116
117 let Some(deployment_conditions) = &deployment
118 .status
119 .as_ref()
120 .and_then(|status| status.conditions.as_ref())
121 else {
122 return Ok(());
125 };
126
127 let ready = deployment_conditions
128 .iter()
129 .any(|condition| condition.type_ == "Available" && condition.status == "True");
130 let ready_str = if ready { "True" } else { "False" };
131
132 let mut status = console.status.clone().unwrap();
133 if status
134 .conditions
135 .iter()
136 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
137 {
138 return Ok(());
142 }
143
144 status.conditions = vec![Condition {
145 type_: "Ready".to_string(),
146 status: ready_str.to_string(),
147 last_transition_time: Time(chrono::offset::Utc::now()),
148 message: format!(
149 "console deployment is{} ready",
150 if ready { "" } else { " not" }
151 ),
152 observed_generation: None,
153 reason: "DeploymentStatus".to_string(),
154 }];
155 let mut new_console = console.clone();
156 new_console.status = Some(status);
157
158 console_api
159 .replace_status(
160 &console.name_unchecked(),
161 &PostParams::default(),
162 serde_json::to_vec(&new_console).unwrap(),
163 )
164 .await?;
165
166 Ok(())
167 }
168
169 fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
170 let mut network_policies = Vec::new();
171 if self.config.network_policies_ingress_enabled {
172 let console_label_selector = LabelSelector {
173 match_labels: Some(
174 console
175 .default_labels()
176 .into_iter()
177 .chain([("materialize.cloud/app".to_owned(), console.app_name())])
178 .collect(),
179 ),
180 ..Default::default()
181 };
182 network_policies.extend([NetworkPolicy {
183 metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
184 spec: Some(NetworkPolicySpec {
185 ingress: Some(vec![NetworkPolicyIngressRule {
186 from: Some(
187 self.config
188 .network_policies_ingress_cidrs
189 .iter()
190 .map(|cidr| NetworkPolicyPeer {
191 ip_block: Some(IPBlock {
192 cidr: cidr.to_owned(),
193 except: None,
194 }),
195 ..Default::default()
196 })
197 .collect(),
198 ),
199 ports: Some(vec![NetworkPolicyPort {
200 port: Some(IntOrString::Int(self.config.console_http_port.into())),
201 protocol: Some("TCP".to_string()),
202 ..Default::default()
203 }]),
204 ..Default::default()
205 }]),
206 pod_selector: Some(console_label_selector),
207 policy_types: Some(vec!["Ingress".to_owned()]),
208 ..Default::default()
209 }),
210 }]);
211 }
212 network_policies
213 }
214
215 fn create_console_external_certificate(&self, console: &Console) -> Option<Certificate> {
216 create_certificate(
217 self.config
218 .default_certificate_specs
219 .console_external
220 .clone(),
221 console,
222 console.spec.external_certificate_spec.clone(),
223 console.external_certificate_name(),
224 console.external_certificate_secret_name(),
225 None,
226 CertificatePrivateKeyAlgorithm::Rsa,
227 Some(4096),
228 )
229 }
230
231 fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
232 let version: String = console
233 .spec
234 .console_image_ref
235 .rsplitn(2, ':')
236 .next()
237 .expect("at least one chunk, even if empty")
238 .to_owned();
239 let app_config_json = serde_json::to_string(&AppConfig {
240 version,
241 auth: AppConfigAuth {
242 mode: console.spec.authenticator_kind,
243 },
244 })
245 .expect("known valid");
246 ConfigMap {
247 binary_data: None,
248 data: Some(btreemap! {
249 "app-config.json".to_owned() => app_config_json,
250 }),
251 immutable: None,
252 metadata: console.managed_resource_meta(console.configmap_name()),
253 }
254 }
255
256 fn create_console_deployment_object(&self, console: &Console) -> Deployment {
257 let mut pod_template_labels = console.default_labels();
258 pod_template_labels.insert(
259 "materialize.cloud/name".to_owned(),
260 console.deployment_name(),
261 );
262 pod_template_labels.insert("app".to_owned(), "console".to_string());
263 pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
264
265 let ports = vec![ContainerPort {
266 container_port: self.config.console_http_port.into(),
267 name: Some("http".into()),
268 protocol: Some("TCP".into()),
269 ..Default::default()
270 }];
271
272 let scheme = match console.spec.balancerd.scheme {
273 HttpConnectionScheme::Http => "http",
274 HttpConnectionScheme::Https => "https",
275 };
276 let mut env = vec![EnvVar {
277 name: "MZ_ENDPOINT".to_string(),
278 value: Some(format!(
279 "{}://{}.{}.svc.cluster.local:{}",
280 scheme,
281 console.spec.balancerd.service_name,
282 console.spec.balancerd.namespace,
283 self.config.balancerd_http_port,
284 )),
285 ..Default::default()
286 }];
287 let mut volumes = vec![Volume {
288 name: "app-config".to_string(),
289 config_map: Some(ConfigMapVolumeSource {
290 name: console.configmap_name(),
291 default_mode: Some(256),
292 optional: Some(false),
293 items: Some(vec![KeyToPath {
294 key: "app-config.json".to_string(),
295 path: "app-config.json".to_string(),
296 ..Default::default()
297 }]),
298 }),
299 ..Default::default()
300 }];
301 let mut volume_mounts = vec![VolumeMount {
302 name: "app-config".to_string(),
303 mount_path: "/usr/share/nginx/html/app-config".to_string(),
304 ..Default::default()
305 }];
306
307 let scheme = if issuer_ref_defined(
308 &self.config.default_certificate_specs.console_external,
309 &console.spec.external_certificate_spec,
310 ) {
311 volumes.push(Volume {
312 name: "external-certificate".to_owned(),
313 secret: Some(SecretVolumeSource {
314 default_mode: Some(0o400),
315 secret_name: Some(console.external_certificate_secret_name()),
316 items: None,
317 optional: Some(false),
318 }),
319 ..Default::default()
320 });
321 volume_mounts.push(VolumeMount {
322 name: "external-certificate".to_owned(),
323 mount_path: "/nginx/tls".to_owned(),
324 read_only: Some(true),
325 ..Default::default()
326 });
327 env.push(EnvVar {
328 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
329 value: Some(format!(
330 "listen {} ssl;
331ssl_certificate /nginx/tls/tls.crt;
332ssl_certificate_key /nginx/tls/tls.key;",
333 self.config.console_http_port
334 )),
335 ..Default::default()
336 });
337 Some("HTTPS".to_owned())
338 } else {
339 env.push(EnvVar {
340 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
341 value: Some(format!("listen {};", self.config.console_http_port)),
342 ..Default::default()
343 });
344 Some("HTTP".to_owned())
345 };
346
347 let probe = Probe {
348 http_get: Some(HTTPGetAction {
349 path: Some("/".to_string()),
350 port: IntOrString::Int(self.config.console_http_port.into()),
351 scheme,
352 ..Default::default()
353 }),
354 ..Default::default()
355 };
356
357 let security_context = if self.config.enable_security_context {
358 Some(SecurityContext {
362 run_as_non_root: Some(true),
363 capabilities: Some(Capabilities {
364 drop: Some(vec!["ALL".to_string()]),
365 ..Default::default()
366 }),
367 seccomp_profile: Some(SeccompProfile {
368 type_: "RuntimeDefault".to_string(),
369 ..Default::default()
370 }),
371 allow_privilege_escalation: Some(false),
372 ..Default::default()
373 })
374 } else {
375 None
376 };
377
378 let container = Container {
379 name: "console".to_owned(),
380 image: Some(console.spec.console_image_ref.clone()),
381 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
382 ports: Some(ports),
383 env: Some(env),
384 startup_probe: Some(Probe {
385 period_seconds: Some(1),
386 failure_threshold: Some(10),
387 ..probe.clone()
388 }),
389 readiness_probe: Some(Probe {
390 period_seconds: Some(30),
391 failure_threshold: Some(1),
392 ..probe.clone()
393 }),
394 liveness_probe: Some(Probe {
395 period_seconds: Some(30),
396 ..probe.clone()
397 }),
398 resources: console
399 .spec
400 .resource_requirements
401 .clone()
402 .or_else(|| self.config.console_default_resources.clone()),
403 security_context,
404 volume_mounts: Some(volume_mounts),
405 ..Default::default()
406 };
407
408 let deployment_spec = DeploymentSpec {
409 replicas: Some(console.replicas()),
410 selector: LabelSelector {
411 match_labels: Some(pod_template_labels.clone()),
412 ..Default::default()
413 },
414 template: PodTemplateSpec {
415 metadata: Some(ObjectMeta {
418 labels: Some(pod_template_labels),
419 ..Default::default()
420 }),
421 spec: Some(PodSpec {
422 containers: vec![container],
423 node_selector: Some(
424 self.config
425 .console_node_selector
426 .iter()
427 .map(|selector| (selector.key.clone(), selector.value.clone()))
428 .collect(),
429 ),
430 affinity: self.config.console_affinity.clone(),
431 tolerations: self.config.console_tolerations.clone(),
432 scheduler_name: self.config.scheduler_name.clone(),
433 volumes: Some(volumes),
434 security_context: Some(PodSecurityContext {
435 fs_group: Some(101),
436 ..Default::default()
437 }),
438 ..Default::default()
439 }),
440 },
441 ..Default::default()
442 };
443
444 Deployment {
445 metadata: ObjectMeta {
446 ..console.managed_resource_meta(console.deployment_name())
447 },
448 spec: Some(deployment_spec),
449 status: None,
450 }
451 }
452
453 fn create_console_service_object(&self, console: &Console) -> Service {
454 let selector =
455 btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
456
457 let ports = vec![ServicePort {
458 name: Some("http".to_string()),
459 protocol: Some("TCP".to_string()),
460 port: self.config.console_http_port.into(),
461 target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
462 ..Default::default()
463 }];
464
465 let spec = ServiceSpec {
466 type_: Some("ClusterIP".to_string()),
467 cluster_ip: Some("None".to_string()),
468 selector: Some(selector),
469 ports: Some(ports),
470 ..Default::default()
471 };
472
473 Service {
474 metadata: console.managed_resource_meta(console.service_name()),
475 spec: Some(spec),
476 status: None,
477 }
478 }
479
480 async fn fix_deployment(
483 &self,
484 deployment_api: &Api<Deployment>,
485 new_deployment: &Deployment,
486 ) -> Result<(), Error> {
487 let Some(mut existing_deployment) = self
488 .deployments
489 .get(
490 &ObjectRef::new(&new_deployment.name_unchecked())
491 .within(&new_deployment.namespace().unwrap()),
492 )
493 .map(Arc::unwrap_or_clone)
494 else {
495 return Ok(());
496 };
497
498 if existing_deployment.spec.as_ref().unwrap().selector
499 == new_deployment.spec.as_ref().unwrap().selector
500 {
501 return Ok(());
502 }
503
504 warn!("found existing deployment with old label selector, fixing");
505
506 existing_deployment
509 .spec
510 .as_mut()
511 .unwrap()
512 .template
513 .metadata
514 .as_mut()
515 .unwrap()
516 .labels = new_deployment
517 .spec
518 .as_ref()
519 .unwrap()
520 .template
521 .metadata
522 .as_ref()
523 .unwrap()
524 .labels
525 .clone();
526
527 replace_resource(deployment_api, &existing_deployment).await?;
531 await_condition(
532 deployment_api.clone(),
533 &existing_deployment.name_unchecked(),
534 |deployment: Option<&Deployment>| {
535 let observed_generation = deployment
536 .and_then(|deployment| deployment.status.as_ref())
537 .and_then(|status| status.observed_generation)
538 .unwrap_or(0);
539 let current_generation = deployment
540 .and_then(|deployment| deployment.meta().generation)
541 .unwrap_or(0);
542 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
543 observed_generation == current_generation
544 && current_generation > previous_generation
545 },
546 )
547 .await
548 .map_err(|e| anyhow::anyhow!(e))?;
549 await_condition(
550 deployment_api.clone(),
551 &existing_deployment.name_unchecked(),
552 is_deployment_completed(),
553 )
554 .await
555 .map_err(|e| anyhow::anyhow!(e))?;
556
557 match kube::runtime::wait::delete::delete_and_finalize(
560 deployment_api.clone(),
561 &existing_deployment.name_unchecked(),
562 &DeleteParams::orphan(),
563 )
564 .await
565 {
566 Ok(_) => {}
567 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
568 if e.code == 404 =>
569 {
570 }
572 Err(e) => return Err(anyhow::anyhow!(e).into()),
573 }
574
575 Ok(())
581 }
582}
583
584#[async_trait::async_trait]
585impl k8s_controller::Context for Context {
586 type Resource = Console;
587 type Error = Error;
588
589 #[instrument(fields())]
590 async fn apply(
591 &self,
592 client: Client,
593 console: &Self::Resource,
594 ) -> Result<Option<Action>, Self::Error> {
595 if console.status.is_none() {
596 let console_api: Api<Console> =
597 Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
598 let mut new_console = console.clone();
599 new_console.status = Some(console.status());
600 console_api
601 .replace_status(
602 &console.name_unchecked(),
603 &PostParams::default(),
604 serde_json::to_vec(&new_console).unwrap(),
605 )
606 .await?;
607 return Ok(None);
610 }
611
612 let namespace = console.namespace();
613 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
614 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
615 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
616 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
617 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
618
619 trace!("creating new network policies");
620 let network_policies = self.create_network_policies(console);
621 for network_policy in &network_policies {
622 apply_resource(&network_policy_api, network_policy).await?;
623 }
624
625 trace!("creating new console configmap");
626 let console_configmap = self.create_console_app_configmap_object(console);
627 apply_resource(&configmap_api, &console_configmap).await?;
628
629 trace!("creating new console deployment");
630 let console_deployment = self.create_console_deployment_object(console);
631 self.fix_deployment(&deployment_api, &console_deployment)
632 .await?;
633 apply_resource(&deployment_api, &console_deployment).await?;
634
635 trace!("creating new console service");
636 let console_service = self.create_console_service_object(console);
637 apply_resource(&service_api, &console_service).await?;
638
639 let console_external_certificate = self.create_console_external_certificate(console);
640 if let Some(certificate) = &console_external_certificate {
641 trace!("creating new console external certificate");
642 apply_resource(&certificate_api, certificate).await?;
643 }
644
645 self.sync_deployment_status(&client, console).await?;
646
647 Ok(None)
648 }
649}