1use k8s_openapi::{
11 api::{
12 apps::v1::{Deployment, DeploymentSpec},
13 core::v1::{
14 Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
15 EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16 ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
17 ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
18 },
19 networking::v1::{
20 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21 NetworkPolicySpec,
22 },
23 },
24 apimachinery::pkg::{
25 apis::meta::v1::{Condition, LabelSelector, Time},
26 util::intstr::IntOrString,
27 },
28 jiff::Timestamp,
29};
30use kube::{
31 Api, Client, Resource, ResourceExt,
32 api::{DeleteParams, ObjectMeta, PostParams},
33 runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
34};
35use maplit::btreemap;
36use serde::Serialize;
37use tracing::{trace, warn};
38
39use crate::{
40 Error,
41 k8s::{apply_resource, get_resource, replace_resource},
42 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
43};
44use mz_cloud_resources::crd::{
45 ManagedResource,
46 console::v1alpha1::{Console, HttpConnectionScheme},
47 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
48};
49use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
50use mz_ore::{cli::KeyValueArg, instrument};
51use mz_server_core::listeners::AuthenticatorKind;
52
53pub struct Config {
54 pub enable_security_context: bool,
55 pub enable_prometheus_scrape_annotations: bool,
56
57 pub image_pull_policy: KubernetesImagePullPolicy,
58 pub scheduler_name: Option<String>,
59 pub console_node_selector: Vec<KeyValueArg<String, String>>,
60 pub console_affinity: Option<Affinity>,
61 pub console_tolerations: Option<Vec<Toleration>>,
62 pub console_default_resources: Option<ResourceRequirements>,
63 pub network_policies_ingress_enabled: bool,
64 pub network_policies_ingress_cidrs: Vec<String>,
65
66 pub default_certificate_specs: DefaultCertificateSpecs,
67
68 pub console_http_port: u16,
69 pub balancerd_http_port: u16,
70}
71
72#[derive(Serialize)]
73struct AppConfig {
74 version: String,
75 auth: AppConfigAuth,
76}
77
78#[derive(Serialize)]
79struct AppConfigAuth {
80 mode: AuthenticatorKind,
81}
82
83pub struct Context {
84 config: Config,
85}
86
87impl Context {
88 pub fn new(config: Config) -> Self {
89 Self { config }
90 }
91
92 async fn sync_deployment_status(
93 &self,
94 client: &Client,
95 console: &Console,
96 ) -> Result<(), Error> {
97 let namespace = console.namespace();
98 let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
99 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
100
101 let Some(deployment) = get_resource(&deployment_api, &console.deployment_name()).await?
102 else {
103 return Ok(());
104 };
105
106 let Some(deployment_conditions) = &deployment
107 .status
108 .as_ref()
109 .and_then(|status| status.conditions.as_ref())
110 else {
111 return Ok(());
114 };
115
116 let ready = deployment_conditions
117 .iter()
118 .any(|condition| condition.type_ == "Available" && condition.status == "True");
119 let ready_str = if ready { "True" } else { "False" };
120
121 let mut status = console.status.clone().unwrap();
122 if status
123 .conditions
124 .iter()
125 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
126 {
127 return Ok(());
131 }
132
133 status.conditions = vec![Condition {
134 type_: "Ready".to_string(),
135 status: ready_str.to_string(),
136 last_transition_time: Time(Timestamp::now()),
137 message: format!(
138 "console deployment is{} ready",
139 if ready { "" } else { " not" }
140 ),
141 observed_generation: None,
142 reason: "DeploymentStatus".to_string(),
143 }];
144 let mut new_console = console.clone();
145 new_console.status = Some(status);
146
147 console_api
148 .replace_status(
149 &console.name_unchecked(),
150 &PostParams::default(),
151 &new_console,
152 )
153 .await?;
154
155 Ok(())
156 }
157
158 fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
159 let mut network_policies = Vec::new();
160 if self.config.network_policies_ingress_enabled {
161 let console_label_selector = LabelSelector {
162 match_labels: Some(
163 console
164 .default_labels()
165 .into_iter()
166 .chain([("materialize.cloud/app".to_owned(), console.app_name())])
167 .collect(),
168 ),
169 ..Default::default()
170 };
171 network_policies.extend([NetworkPolicy {
172 metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
173 spec: Some(NetworkPolicySpec {
174 ingress: Some(vec![NetworkPolicyIngressRule {
175 from: Some(
176 self.config
177 .network_policies_ingress_cidrs
178 .iter()
179 .map(|cidr| NetworkPolicyPeer {
180 ip_block: Some(IPBlock {
181 cidr: cidr.to_owned(),
182 except: None,
183 }),
184 ..Default::default()
185 })
186 .collect(),
187 ),
188 ports: Some(vec![NetworkPolicyPort {
189 port: Some(IntOrString::Int(self.config.console_http_port.into())),
190 protocol: Some("TCP".to_string()),
191 ..Default::default()
192 }]),
193 ..Default::default()
194 }]),
195 pod_selector: Some(console_label_selector),
196 policy_types: Some(vec!["Ingress".to_owned()]),
197 ..Default::default()
198 }),
199 }]);
200 }
201 network_policies
202 }
203
204 fn create_console_external_certificate(
205 &self,
206 console: &Console,
207 ) -> anyhow::Result<Option<Certificate>> {
208 create_certificate(
209 self.config
210 .default_certificate_specs
211 .console_external
212 .clone(),
213 console,
214 console.spec.external_certificate_spec.clone(),
215 console.external_certificate_name(),
216 console.external_certificate_secret_name(),
217 None,
218 CertificatePrivateKeyAlgorithm::Ecdsa,
219 Some(256),
220 )
221 }
222
223 fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
224 let version: String = console
225 .spec
226 .console_image_ref
227 .rsplitn(2, ':')
228 .next()
229 .expect("at least one chunk, even if empty")
230 .to_owned();
231 let app_config_json = serde_json::to_string(&AppConfig {
232 version,
233 auth: AppConfigAuth {
234 mode: console.spec.authenticator_kind,
235 },
236 })
237 .expect("known valid");
238 ConfigMap {
239 binary_data: None,
240 data: Some(btreemap! {
241 "app-config.json".to_owned() => app_config_json,
242 }),
243 immutable: None,
244 metadata: console.managed_resource_meta(console.configmap_name()),
245 }
246 }
247
248 fn create_console_deployment_object(&self, console: &Console) -> Deployment {
249 let mut pod_template_labels = console.default_labels();
250 pod_template_labels.insert(
251 "materialize.cloud/name".to_owned(),
252 console.deployment_name(),
253 );
254 pod_template_labels.insert("app".to_owned(), "console".to_string());
255 pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
256
257 let ports = vec![ContainerPort {
258 container_port: self.config.console_http_port.into(),
259 name: Some("http".into()),
260 protocol: Some("TCP".into()),
261 ..Default::default()
262 }];
263
264 let scheme = match console.spec.balancerd.scheme {
265 HttpConnectionScheme::Http => "http",
266 HttpConnectionScheme::Https => "https",
267 };
268 let mut env = vec![EnvVar {
269 name: "MZ_ENDPOINT".to_string(),
270 value: Some(format!(
271 "{}://{}.{}.svc.cluster.local:{}",
272 scheme,
273 console.spec.balancerd.service_name,
274 console.spec.balancerd.namespace,
275 self.config.balancerd_http_port,
276 )),
277 ..Default::default()
278 }];
279 let mut volumes = vec![Volume {
280 name: "app-config".to_string(),
281 config_map: Some(ConfigMapVolumeSource {
282 name: console.configmap_name(),
283 default_mode: Some(256),
284 optional: Some(false),
285 items: Some(vec![KeyToPath {
286 key: "app-config.json".to_string(),
287 path: "app-config.json".to_string(),
288 ..Default::default()
289 }]),
290 }),
291 ..Default::default()
292 }];
293 let mut volume_mounts = vec![VolumeMount {
294 name: "app-config".to_string(),
295 mount_path: "/usr/share/nginx/html/app-config".to_string(),
296 ..Default::default()
297 }];
298
299 let scheme = if issuer_ref_defined(
300 &self.config.default_certificate_specs.console_external,
301 &console.spec.external_certificate_spec,
302 ) {
303 volumes.push(Volume {
304 name: "external-certificate".to_owned(),
305 secret: Some(SecretVolumeSource {
306 default_mode: Some(0o400),
307 secret_name: Some(console.external_certificate_secret_name()),
308 items: None,
309 optional: Some(false),
310 }),
311 ..Default::default()
312 });
313 volume_mounts.push(VolumeMount {
314 name: "external-certificate".to_owned(),
315 mount_path: "/nginx/tls".to_owned(),
316 read_only: Some(true),
317 ..Default::default()
318 });
319 env.push(EnvVar {
320 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
321 value: Some(format!(
322 "listen {} ssl;
323ssl_certificate /nginx/tls/tls.crt;
324ssl_certificate_key /nginx/tls/tls.key;",
325 self.config.console_http_port
326 )),
327 ..Default::default()
328 });
329 Some("HTTPS".to_owned())
330 } else {
331 env.push(EnvVar {
332 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
333 value: Some(format!("listen {};", self.config.console_http_port)),
334 ..Default::default()
335 });
336 Some("HTTP".to_owned())
337 };
338
339 let probe = Probe {
340 http_get: Some(HTTPGetAction {
341 path: Some("/".to_string()),
342 port: IntOrString::Int(self.config.console_http_port.into()),
343 scheme,
344 ..Default::default()
345 }),
346 ..Default::default()
347 };
348
349 let security_context = if self.config.enable_security_context {
350 Some(SecurityContext {
354 run_as_non_root: Some(true),
355 capabilities: Some(Capabilities {
356 drop: Some(vec!["ALL".to_string()]),
357 ..Default::default()
358 }),
359 seccomp_profile: Some(SeccompProfile {
360 type_: "RuntimeDefault".to_string(),
361 ..Default::default()
362 }),
363 allow_privilege_escalation: Some(false),
364 ..Default::default()
365 })
366 } else {
367 None
368 };
369
370 let container = Container {
371 name: "console".to_owned(),
372 image: Some(console.spec.console_image_ref.clone()),
373 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
374 ports: Some(ports),
375 env: Some(env),
376 startup_probe: Some(Probe {
377 period_seconds: Some(1),
378 failure_threshold: Some(10),
379 ..probe.clone()
380 }),
381 readiness_probe: Some(Probe {
382 period_seconds: Some(30),
383 failure_threshold: Some(1),
384 ..probe.clone()
385 }),
386 liveness_probe: Some(Probe {
387 period_seconds: Some(30),
388 ..probe.clone()
389 }),
390 resources: console
391 .spec
392 .resource_requirements
393 .clone()
394 .or_else(|| self.config.console_default_resources.clone()),
395 security_context,
396 volume_mounts: Some(volume_mounts),
397 ..Default::default()
398 };
399
400 let deployment_spec = DeploymentSpec {
401 replicas: Some(console.replicas()),
402 selector: LabelSelector {
403 match_labels: Some(pod_template_labels.clone()),
404 ..Default::default()
405 },
406 template: PodTemplateSpec {
407 metadata: Some(ObjectMeta {
410 labels: Some(pod_template_labels),
411 ..Default::default()
412 }),
413 spec: Some(PodSpec {
414 containers: vec![container],
415 node_selector: Some(
416 self.config
417 .console_node_selector
418 .iter()
419 .map(|selector| (selector.key.clone(), selector.value.clone()))
420 .collect(),
421 ),
422 affinity: self.config.console_affinity.clone(),
423 tolerations: self.config.console_tolerations.clone(),
424 scheduler_name: self.config.scheduler_name.clone(),
425 volumes: Some(volumes),
426 security_context: Some(PodSecurityContext {
427 fs_group: Some(101),
428 ..Default::default()
429 }),
430 ..Default::default()
431 }),
432 },
433 ..Default::default()
434 };
435
436 Deployment {
437 metadata: ObjectMeta {
438 ..console.managed_resource_meta(console.deployment_name())
439 },
440 spec: Some(deployment_spec),
441 status: None,
442 }
443 }
444
445 fn create_console_service_object(&self, console: &Console) -> Service {
446 let selector =
447 btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
448
449 let ports = vec![ServicePort {
450 name: Some("http".to_string()),
451 protocol: Some("TCP".to_string()),
452 port: self.config.console_http_port.into(),
453 target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
454 ..Default::default()
455 }];
456
457 let spec = ServiceSpec {
458 type_: Some("ClusterIP".to_string()),
459 cluster_ip: Some("None".to_string()),
460 selector: Some(selector),
461 ports: Some(ports),
462 ..Default::default()
463 };
464
465 Service {
466 metadata: console.managed_resource_meta(console.service_name()),
467 spec: Some(spec),
468 status: None,
469 }
470 }
471
472 async fn fix_deployment(
475 &self,
476 deployment_api: &Api<Deployment>,
477 new_deployment: &Deployment,
478 ) -> Result<(), Error> {
479 let Some(mut existing_deployment) =
480 get_resource(deployment_api, &new_deployment.name_unchecked()).await?
481 else {
482 return Ok(());
483 };
484
485 if existing_deployment.spec.as_ref().unwrap().selector
486 == new_deployment.spec.as_ref().unwrap().selector
487 {
488 return Ok(());
489 }
490
491 warn!("found existing deployment with old label selector, fixing");
492
493 existing_deployment
496 .spec
497 .as_mut()
498 .unwrap()
499 .template
500 .metadata
501 .as_mut()
502 .unwrap()
503 .labels = new_deployment
504 .spec
505 .as_ref()
506 .unwrap()
507 .template
508 .metadata
509 .as_ref()
510 .unwrap()
511 .labels
512 .clone();
513
514 replace_resource(deployment_api, &existing_deployment).await?;
518 await_condition(
519 deployment_api.clone(),
520 &existing_deployment.name_unchecked(),
521 |deployment: Option<&Deployment>| {
522 let observed_generation = deployment
523 .and_then(|deployment| deployment.status.as_ref())
524 .and_then(|status| status.observed_generation)
525 .unwrap_or(0);
526 let current_generation = deployment
527 .and_then(|deployment| deployment.meta().generation)
528 .unwrap_or(0);
529 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
530 observed_generation == current_generation
531 && current_generation > previous_generation
532 },
533 )
534 .await
535 .map_err(|e| anyhow::anyhow!(e))?;
536 await_condition(
537 deployment_api.clone(),
538 &existing_deployment.name_unchecked(),
539 is_deployment_completed(),
540 )
541 .await
542 .map_err(|e| anyhow::anyhow!(e))?;
543
544 match kube::runtime::wait::delete::delete_and_finalize(
547 deployment_api.clone(),
548 &existing_deployment.name_unchecked(),
549 &DeleteParams::orphan(),
550 )
551 .await
552 {
553 Ok(_) => {}
554 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
555 if e.code == 404 =>
556 {
557 }
559 Err(e) => return Err(anyhow::anyhow!(e).into()),
560 }
561
562 Ok(())
568 }
569}
570
571#[async_trait::async_trait]
572impl k8s_controller::Context for Context {
573 type Resource = Console;
574 type Error = Error;
575
576 #[instrument(fields())]
577 async fn apply(
578 &self,
579 client: Client,
580 console: &Self::Resource,
581 ) -> Result<Option<Action>, Self::Error> {
582 if console.status.is_none() {
583 let console_api: Api<Console> =
584 Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
585 let mut new_console = console.clone();
586 new_console.status = Some(console.status());
587 console_api
588 .replace_status(
589 &console.name_unchecked(),
590 &PostParams::default(),
591 &new_console,
592 )
593 .await?;
594 return Ok(None);
597 }
598
599 let namespace = console.namespace();
600 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
601 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
602 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
603 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
604 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
605
606 trace!("creating new network policies");
607 let network_policies = self.create_network_policies(console);
608 for network_policy in &network_policies {
609 apply_resource(&network_policy_api, network_policy).await?;
610 }
611
612 trace!("creating new console configmap");
613 let console_configmap = self.create_console_app_configmap_object(console);
614 apply_resource(&configmap_api, &console_configmap).await?;
615
616 trace!("creating new console deployment");
617 let console_deployment = self.create_console_deployment_object(console);
618 self.fix_deployment(&deployment_api, &console_deployment)
619 .await?;
620 apply_resource(&deployment_api, &console_deployment).await?;
621
622 trace!("creating new console service");
623 let console_service = self.create_console_service_object(console);
624 apply_resource(&service_api, &console_service).await?;
625
626 let console_external_certificate = self.create_console_external_certificate(console)?;
627 if let Some(certificate) = &console_external_certificate {
628 trace!("creating new console external certificate");
629 apply_resource(&certificate_api, certificate).await?;
630 }
631
632 self.sync_deployment_status(&client, console).await?;
633
634 Ok(None)
635 }
636}