1use std::sync::Arc;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec},
15 core::v1::{
16 Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
17 EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
18 ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
19 ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
20 },
21 networking::v1::{
22 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
23 NetworkPolicySpec,
24 },
25 },
26 apimachinery::pkg::{
27 apis::meta::v1::{Condition, LabelSelector, Time},
28 util::intstr::IntOrString,
29 },
30 jiff::Timestamp,
31};
32use kube::{
33 Api, Client, Resource, ResourceExt,
34 api::{DeleteParams, ObjectMeta, PostParams},
35 runtime::{
36 conditions::is_deployment_completed,
37 controller::Action,
38 reflector::{ObjectRef, Store},
39 wait::await_condition,
40 },
41};
42use maplit::btreemap;
43use serde::Serialize;
44use tracing::{trace, warn};
45
46use crate::{
47 Error,
48 k8s::{apply_resource, make_reflector, replace_resource},
49 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
50};
51use mz_cloud_resources::crd::{
52 ManagedResource,
53 console::v1alpha1::{Console, HttpConnectionScheme},
54 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
55};
56use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
57use mz_ore::{cli::KeyValueArg, instrument};
58use mz_server_core::listeners::AuthenticatorKind;
59
60pub struct Config {
61 pub enable_security_context: bool,
62 pub enable_prometheus_scrape_annotations: bool,
63
64 pub image_pull_policy: KubernetesImagePullPolicy,
65 pub scheduler_name: Option<String>,
66 pub console_node_selector: Vec<KeyValueArg<String, String>>,
67 pub console_affinity: Option<Affinity>,
68 pub console_tolerations: Option<Vec<Toleration>>,
69 pub console_default_resources: Option<ResourceRequirements>,
70 pub network_policies_ingress_enabled: bool,
71 pub network_policies_ingress_cidrs: Vec<String>,
72
73 pub default_certificate_specs: DefaultCertificateSpecs,
74
75 pub console_http_port: u16,
76 pub balancerd_http_port: u16,
77}
78
79#[derive(Serialize)]
80struct AppConfig {
81 version: String,
82 auth: AppConfigAuth,
83}
84
85#[derive(Serialize)]
86struct AppConfigAuth {
87 mode: AuthenticatorKind,
88}
89
90pub struct Context {
91 config: Config,
92 deployments: Store<Deployment>,
93}
94
95impl Context {
96 pub async fn new(config: Config, client: Client) -> Self {
97 Self {
98 config,
99 deployments: make_reflector(client).await,
100 }
101 }
102
103 async fn sync_deployment_status(
104 &self,
105 client: &Client,
106 console: &Console,
107 ) -> Result<(), kube::Error> {
108 let namespace = console.namespace();
109 let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
110
111 let Some(deployment) = self
112 .deployments
113 .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
114 else {
115 return Ok(());
116 };
117
118 let Some(deployment_conditions) = &deployment
119 .status
120 .as_ref()
121 .and_then(|status| status.conditions.as_ref())
122 else {
123 return Ok(());
126 };
127
128 let ready = deployment_conditions
129 .iter()
130 .any(|condition| condition.type_ == "Available" && condition.status == "True");
131 let ready_str = if ready { "True" } else { "False" };
132
133 let mut status = console.status.clone().unwrap();
134 if status
135 .conditions
136 .iter()
137 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
138 {
139 return Ok(());
143 }
144
145 status.conditions = vec![Condition {
146 type_: "Ready".to_string(),
147 status: ready_str.to_string(),
148 last_transition_time: Time(Timestamp::now()),
149 message: format!(
150 "console deployment is{} ready",
151 if ready { "" } else { " not" }
152 ),
153 observed_generation: None,
154 reason: "DeploymentStatus".to_string(),
155 }];
156 let mut new_console = console.clone();
157 new_console.status = Some(status);
158
159 console_api
160 .replace_status(
161 &console.name_unchecked(),
162 &PostParams::default(),
163 &new_console,
164 )
165 .await?;
166
167 Ok(())
168 }
169
170 fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
171 let mut network_policies = Vec::new();
172 if self.config.network_policies_ingress_enabled {
173 let console_label_selector = LabelSelector {
174 match_labels: Some(
175 console
176 .default_labels()
177 .into_iter()
178 .chain([("materialize.cloud/app".to_owned(), console.app_name())])
179 .collect(),
180 ),
181 ..Default::default()
182 };
183 network_policies.extend([NetworkPolicy {
184 metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
185 spec: Some(NetworkPolicySpec {
186 ingress: Some(vec![NetworkPolicyIngressRule {
187 from: Some(
188 self.config
189 .network_policies_ingress_cidrs
190 .iter()
191 .map(|cidr| NetworkPolicyPeer {
192 ip_block: Some(IPBlock {
193 cidr: cidr.to_owned(),
194 except: None,
195 }),
196 ..Default::default()
197 })
198 .collect(),
199 ),
200 ports: Some(vec![NetworkPolicyPort {
201 port: Some(IntOrString::Int(self.config.console_http_port.into())),
202 protocol: Some("TCP".to_string()),
203 ..Default::default()
204 }]),
205 ..Default::default()
206 }]),
207 pod_selector: Some(console_label_selector),
208 policy_types: Some(vec!["Ingress".to_owned()]),
209 ..Default::default()
210 }),
211 }]);
212 }
213 network_policies
214 }
215
216 fn create_console_external_certificate(
217 &self,
218 console: &Console,
219 ) -> anyhow::Result<Option<Certificate>> {
220 create_certificate(
221 self.config
222 .default_certificate_specs
223 .console_external
224 .clone(),
225 console,
226 console.spec.external_certificate_spec.clone(),
227 console.external_certificate_name(),
228 console.external_certificate_secret_name(),
229 None,
230 CertificatePrivateKeyAlgorithm::Ecdsa,
231 Some(256),
232 )
233 }
234
235 fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
236 let version: String = console
237 .spec
238 .console_image_ref
239 .rsplitn(2, ':')
240 .next()
241 .expect("at least one chunk, even if empty")
242 .to_owned();
243 let app_config_json = serde_json::to_string(&AppConfig {
244 version,
245 auth: AppConfigAuth {
246 mode: console.spec.authenticator_kind,
247 },
248 })
249 .expect("known valid");
250 ConfigMap {
251 binary_data: None,
252 data: Some(btreemap! {
253 "app-config.json".to_owned() => app_config_json,
254 }),
255 immutable: None,
256 metadata: console.managed_resource_meta(console.configmap_name()),
257 }
258 }
259
260 fn create_console_deployment_object(&self, console: &Console) -> Deployment {
261 let mut pod_template_labels = console.default_labels();
262 pod_template_labels.insert(
263 "materialize.cloud/name".to_owned(),
264 console.deployment_name(),
265 );
266 pod_template_labels.insert("app".to_owned(), "console".to_string());
267 pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
268
269 let ports = vec![ContainerPort {
270 container_port: self.config.console_http_port.into(),
271 name: Some("http".into()),
272 protocol: Some("TCP".into()),
273 ..Default::default()
274 }];
275
276 let scheme = match console.spec.balancerd.scheme {
277 HttpConnectionScheme::Http => "http",
278 HttpConnectionScheme::Https => "https",
279 };
280 let mut env = vec![EnvVar {
281 name: "MZ_ENDPOINT".to_string(),
282 value: Some(format!(
283 "{}://{}.{}.svc.cluster.local:{}",
284 scheme,
285 console.spec.balancerd.service_name,
286 console.spec.balancerd.namespace,
287 self.config.balancerd_http_port,
288 )),
289 ..Default::default()
290 }];
291 let mut volumes = vec![Volume {
292 name: "app-config".to_string(),
293 config_map: Some(ConfigMapVolumeSource {
294 name: console.configmap_name(),
295 default_mode: Some(256),
296 optional: Some(false),
297 items: Some(vec![KeyToPath {
298 key: "app-config.json".to_string(),
299 path: "app-config.json".to_string(),
300 ..Default::default()
301 }]),
302 }),
303 ..Default::default()
304 }];
305 let mut volume_mounts = vec![VolumeMount {
306 name: "app-config".to_string(),
307 mount_path: "/usr/share/nginx/html/app-config".to_string(),
308 ..Default::default()
309 }];
310
311 let scheme = if issuer_ref_defined(
312 &self.config.default_certificate_specs.console_external,
313 &console.spec.external_certificate_spec,
314 ) {
315 volumes.push(Volume {
316 name: "external-certificate".to_owned(),
317 secret: Some(SecretVolumeSource {
318 default_mode: Some(0o400),
319 secret_name: Some(console.external_certificate_secret_name()),
320 items: None,
321 optional: Some(false),
322 }),
323 ..Default::default()
324 });
325 volume_mounts.push(VolumeMount {
326 name: "external-certificate".to_owned(),
327 mount_path: "/nginx/tls".to_owned(),
328 read_only: Some(true),
329 ..Default::default()
330 });
331 env.push(EnvVar {
332 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
333 value: Some(format!(
334 "listen {} ssl;
335ssl_certificate /nginx/tls/tls.crt;
336ssl_certificate_key /nginx/tls/tls.key;",
337 self.config.console_http_port
338 )),
339 ..Default::default()
340 });
341 Some("HTTPS".to_owned())
342 } else {
343 env.push(EnvVar {
344 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
345 value: Some(format!("listen {};", self.config.console_http_port)),
346 ..Default::default()
347 });
348 Some("HTTP".to_owned())
349 };
350
351 let probe = Probe {
352 http_get: Some(HTTPGetAction {
353 path: Some("/".to_string()),
354 port: IntOrString::Int(self.config.console_http_port.into()),
355 scheme,
356 ..Default::default()
357 }),
358 ..Default::default()
359 };
360
361 let security_context = if self.config.enable_security_context {
362 Some(SecurityContext {
366 run_as_non_root: Some(true),
367 capabilities: Some(Capabilities {
368 drop: Some(vec!["ALL".to_string()]),
369 ..Default::default()
370 }),
371 seccomp_profile: Some(SeccompProfile {
372 type_: "RuntimeDefault".to_string(),
373 ..Default::default()
374 }),
375 allow_privilege_escalation: Some(false),
376 ..Default::default()
377 })
378 } else {
379 None
380 };
381
382 let container = Container {
383 name: "console".to_owned(),
384 image: Some(console.spec.console_image_ref.clone()),
385 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
386 ports: Some(ports),
387 env: Some(env),
388 startup_probe: Some(Probe {
389 period_seconds: Some(1),
390 failure_threshold: Some(10),
391 ..probe.clone()
392 }),
393 readiness_probe: Some(Probe {
394 period_seconds: Some(30),
395 failure_threshold: Some(1),
396 ..probe.clone()
397 }),
398 liveness_probe: Some(Probe {
399 period_seconds: Some(30),
400 ..probe.clone()
401 }),
402 resources: console
403 .spec
404 .resource_requirements
405 .clone()
406 .or_else(|| self.config.console_default_resources.clone()),
407 security_context,
408 volume_mounts: Some(volume_mounts),
409 ..Default::default()
410 };
411
412 let deployment_spec = DeploymentSpec {
413 replicas: Some(console.replicas()),
414 selector: LabelSelector {
415 match_labels: Some(pod_template_labels.clone()),
416 ..Default::default()
417 },
418 template: PodTemplateSpec {
419 metadata: Some(ObjectMeta {
422 labels: Some(pod_template_labels),
423 ..Default::default()
424 }),
425 spec: Some(PodSpec {
426 containers: vec![container],
427 node_selector: Some(
428 self.config
429 .console_node_selector
430 .iter()
431 .map(|selector| (selector.key.clone(), selector.value.clone()))
432 .collect(),
433 ),
434 affinity: self.config.console_affinity.clone(),
435 tolerations: self.config.console_tolerations.clone(),
436 scheduler_name: self.config.scheduler_name.clone(),
437 volumes: Some(volumes),
438 security_context: Some(PodSecurityContext {
439 fs_group: Some(101),
440 ..Default::default()
441 }),
442 ..Default::default()
443 }),
444 },
445 ..Default::default()
446 };
447
448 Deployment {
449 metadata: ObjectMeta {
450 ..console.managed_resource_meta(console.deployment_name())
451 },
452 spec: Some(deployment_spec),
453 status: None,
454 }
455 }
456
457 fn create_console_service_object(&self, console: &Console) -> Service {
458 let selector =
459 btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
460
461 let ports = vec![ServicePort {
462 name: Some("http".to_string()),
463 protocol: Some("TCP".to_string()),
464 port: self.config.console_http_port.into(),
465 target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
466 ..Default::default()
467 }];
468
469 let spec = ServiceSpec {
470 type_: Some("ClusterIP".to_string()),
471 cluster_ip: Some("None".to_string()),
472 selector: Some(selector),
473 ports: Some(ports),
474 ..Default::default()
475 };
476
477 Service {
478 metadata: console.managed_resource_meta(console.service_name()),
479 spec: Some(spec),
480 status: None,
481 }
482 }
483
484 async fn fix_deployment(
487 &self,
488 deployment_api: &Api<Deployment>,
489 new_deployment: &Deployment,
490 ) -> Result<(), Error> {
491 let Some(mut existing_deployment) = self
492 .deployments
493 .get(
494 &ObjectRef::new(&new_deployment.name_unchecked())
495 .within(&new_deployment.namespace().unwrap()),
496 )
497 .map(Arc::unwrap_or_clone)
498 else {
499 return Ok(());
500 };
501
502 if existing_deployment.spec.as_ref().unwrap().selector
503 == new_deployment.spec.as_ref().unwrap().selector
504 {
505 return Ok(());
506 }
507
508 warn!("found existing deployment with old label selector, fixing");
509
510 existing_deployment
513 .spec
514 .as_mut()
515 .unwrap()
516 .template
517 .metadata
518 .as_mut()
519 .unwrap()
520 .labels = new_deployment
521 .spec
522 .as_ref()
523 .unwrap()
524 .template
525 .metadata
526 .as_ref()
527 .unwrap()
528 .labels
529 .clone();
530
531 replace_resource(deployment_api, &existing_deployment).await?;
535 await_condition(
536 deployment_api.clone(),
537 &existing_deployment.name_unchecked(),
538 |deployment: Option<&Deployment>| {
539 let observed_generation = deployment
540 .and_then(|deployment| deployment.status.as_ref())
541 .and_then(|status| status.observed_generation)
542 .unwrap_or(0);
543 let current_generation = deployment
544 .and_then(|deployment| deployment.meta().generation)
545 .unwrap_or(0);
546 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
547 observed_generation == current_generation
548 && current_generation > previous_generation
549 },
550 )
551 .await
552 .map_err(|e| anyhow::anyhow!(e))?;
553 await_condition(
554 deployment_api.clone(),
555 &existing_deployment.name_unchecked(),
556 is_deployment_completed(),
557 )
558 .await
559 .map_err(|e| anyhow::anyhow!(e))?;
560
561 match kube::runtime::wait::delete::delete_and_finalize(
564 deployment_api.clone(),
565 &existing_deployment.name_unchecked(),
566 &DeleteParams::orphan(),
567 )
568 .await
569 {
570 Ok(_) => {}
571 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
572 if e.code == 404 =>
573 {
574 }
576 Err(e) => return Err(anyhow::anyhow!(e).into()),
577 }
578
579 Ok(())
585 }
586}
587
588#[async_trait::async_trait]
589impl k8s_controller::Context for Context {
590 type Resource = Console;
591 type Error = Error;
592
593 #[instrument(fields())]
594 async fn apply(
595 &self,
596 client: Client,
597 console: &Self::Resource,
598 ) -> Result<Option<Action>, Self::Error> {
599 if console.status.is_none() {
600 let console_api: Api<Console> =
601 Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
602 let mut new_console = console.clone();
603 new_console.status = Some(console.status());
604 console_api
605 .replace_status(
606 &console.name_unchecked(),
607 &PostParams::default(),
608 &new_console,
609 )
610 .await?;
611 return Ok(None);
614 }
615
616 let namespace = console.namespace();
617 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
618 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
619 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
620 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
621 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
622
623 trace!("creating new network policies");
624 let network_policies = self.create_network_policies(console);
625 for network_policy in &network_policies {
626 apply_resource(&network_policy_api, network_policy).await?;
627 }
628
629 trace!("creating new console configmap");
630 let console_configmap = self.create_console_app_configmap_object(console);
631 apply_resource(&configmap_api, &console_configmap).await?;
632
633 trace!("creating new console deployment");
634 let console_deployment = self.create_console_deployment_object(console);
635 self.fix_deployment(&deployment_api, &console_deployment)
636 .await?;
637 apply_resource(&deployment_api, &console_deployment).await?;
638
639 trace!("creating new console service");
640 let console_service = self.create_console_service_object(console);
641 apply_resource(&service_api, &console_service).await?;
642
643 let console_external_certificate = self.create_console_external_certificate(console)?;
644 if let Some(certificate) = &console_external_certificate {
645 trace!("creating new console external certificate");
646 apply_resource(&certificate_api, certificate).await?;
647 }
648
649 self.sync_deployment_status(&client, console).await?;
650
651 Ok(None)
652 }
653}