1use k8s_openapi::{
11 api::{
12 apps::v1::{Deployment, DeploymentSpec},
13 core::v1::{
14 Affinity, Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort,
15 EnvVar, HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16 ResourceRequirements, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
17 ServicePort, ServiceSpec, Toleration, Volume, VolumeMount,
18 },
19 networking::v1::{
20 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21 NetworkPolicySpec,
22 },
23 },
24 apimachinery::pkg::{
25 apis::meta::v1::{Condition, LabelSelector, Time},
26 util::intstr::IntOrString,
27 },
28};
29use kube::{
30 Api, Client, Resource, ResourceExt,
31 api::{ObjectMeta, PostParams},
32 runtime::{
33 controller::Action,
34 reflector::{ObjectRef, Store},
35 },
36};
37use maplit::btreemap;
38use serde::Serialize;
39use tracing::trace;
40
41use crate::{
42 Error,
43 k8s::{apply_resource, make_reflector},
44 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
45};
46use mz_cloud_resources::crd::{
47 ManagedResource,
48 console::v1alpha1::{Console, HttpConnectionScheme},
49 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
50};
51use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
52use mz_ore::{cli::KeyValueArg, instrument};
53use mz_server_core::listeners::AuthenticatorKind;
54
55pub struct Config {
56 pub enable_security_context: bool,
57 pub enable_prometheus_scrape_annotations: bool,
58
59 pub image_pull_policy: KubernetesImagePullPolicy,
60 pub scheduler_name: Option<String>,
61 pub console_node_selector: Vec<KeyValueArg<String, String>>,
62 pub console_affinity: Option<Affinity>,
63 pub console_tolerations: Option<Vec<Toleration>>,
64 pub console_default_resources: Option<ResourceRequirements>,
65 pub network_policies_ingress_enabled: bool,
66 pub network_policies_ingress_cidrs: Vec<String>,
67
68 pub default_certificate_specs: DefaultCertificateSpecs,
69
70 pub console_http_port: u16,
71 pub balancerd_http_port: u16,
72}
73
74#[derive(Serialize)]
75struct AppConfig {
76 version: String,
77 auth: AppConfigAuth,
78}
79
80#[derive(Serialize)]
81struct AppConfigAuth {
82 mode: AuthenticatorKind,
83}
84
85pub struct Context {
86 config: Config,
87 deployments: Store<Deployment>,
88}
89
90impl Context {
91 pub async fn new(config: Config, client: Client) -> Self {
92 Self {
93 config,
94 deployments: make_reflector(client).await,
95 }
96 }
97
98 async fn sync_deployment_status(
99 &self,
100 client: &Client,
101 console: &Console,
102 ) -> Result<(), kube::Error> {
103 let namespace = console.namespace().unwrap();
104 let console_api: Api<Console> = Api::namespaced(client.clone(), &namespace);
105
106 let Some(deployment) = self
107 .deployments
108 .get(&ObjectRef::new(&console.deployment_name()).within(&namespace))
109 else {
110 return Ok(());
111 };
112
113 let Some(deployment_conditions) = &deployment
114 .status
115 .as_ref()
116 .and_then(|status| status.conditions.as_ref())
117 else {
118 return Ok(());
121 };
122
123 let ready = deployment_conditions
124 .iter()
125 .any(|condition| condition.type_ == "Available" && condition.status == "True");
126 let ready_str = if ready { "True" } else { "False" };
127
128 let mut status = console.status.clone().unwrap();
129 if status
130 .conditions
131 .iter()
132 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
133 {
134 return Ok(());
138 }
139
140 status.conditions = vec![Condition {
141 type_: "Ready".to_string(),
142 status: ready_str.to_string(),
143 last_transition_time: Time(chrono::offset::Utc::now()),
144 message: format!(
145 "console deployment is{} ready",
146 if ready { "" } else { " not" }
147 ),
148 observed_generation: None,
149 reason: "DeploymentStatus".to_string(),
150 }];
151 let mut new_console = console.clone();
152 new_console.status = Some(status);
153
154 console_api
155 .replace_status(
156 &console.name_unchecked(),
157 &PostParams::default(),
158 serde_json::to_vec(&new_console).unwrap(),
159 )
160 .await?;
161
162 Ok(())
163 }
164
165 fn create_network_policies(&self, console: &Console) -> Vec<NetworkPolicy> {
166 let mut network_policies = Vec::new();
167 if self.config.network_policies_ingress_enabled {
168 let console_label_selector = LabelSelector {
169 match_labels: Some(
170 console
171 .default_labels()
172 .into_iter()
173 .chain([("materialize.cloud/app".to_owned(), console.app_name())])
174 .collect(),
175 ),
176 ..Default::default()
177 };
178 network_policies.extend([NetworkPolicy {
179 metadata: console.managed_resource_meta(console.name_prefixed("console-ingress")),
180 spec: Some(NetworkPolicySpec {
181 ingress: Some(vec![NetworkPolicyIngressRule {
182 from: Some(
183 self.config
184 .network_policies_ingress_cidrs
185 .iter()
186 .map(|cidr| NetworkPolicyPeer {
187 ip_block: Some(IPBlock {
188 cidr: cidr.to_owned(),
189 except: None,
190 }),
191 ..Default::default()
192 })
193 .collect(),
194 ),
195 ports: Some(vec![NetworkPolicyPort {
196 port: Some(IntOrString::Int(self.config.console_http_port.into())),
197 protocol: Some("TCP".to_string()),
198 ..Default::default()
199 }]),
200 ..Default::default()
201 }]),
202 pod_selector: Some(console_label_selector),
203 policy_types: Some(vec!["Ingress".to_owned()]),
204 ..Default::default()
205 }),
206 }]);
207 }
208 network_policies
209 }
210
211 fn create_console_external_certificate(&self, console: &Console) -> Option<Certificate> {
212 create_certificate(
213 self.config
214 .default_certificate_specs
215 .console_external
216 .clone(),
217 console,
218 console.spec.external_certificate_spec.clone(),
219 console.external_certificate_name(),
220 console.external_certificate_secret_name(),
221 None,
222 CertificatePrivateKeyAlgorithm::Rsa,
223 Some(4096),
224 )
225 }
226
227 fn create_console_app_configmap_object(&self, console: &Console) -> ConfigMap {
228 let version: String = console
229 .spec
230 .console_image_ref
231 .rsplitn(2, ':')
232 .next()
233 .expect("at least one chunk, even if empty")
234 .to_owned();
235 let app_config_json = serde_json::to_string(&AppConfig {
236 version,
237 auth: AppConfigAuth {
238 mode: console.spec.authenticator_kind,
239 },
240 })
241 .expect("known valid");
242 ConfigMap {
243 binary_data: None,
244 data: Some(btreemap! {
245 "app-config.json".to_owned() => app_config_json,
246 }),
247 immutable: None,
248 metadata: console.managed_resource_meta(console.configmap_name()),
249 }
250 }
251
252 fn create_console_deployment_object(&self, console: &Console) -> Deployment {
253 let mut pod_template_labels = console.default_labels();
254 pod_template_labels.insert(
255 "materialize.cloud/name".to_owned(),
256 console.deployment_name(),
257 );
258 pod_template_labels.insert("app".to_owned(), "console".to_string());
259 pod_template_labels.insert("materialize.cloud/app".to_owned(), console.app_name());
260
261 let ports = vec![ContainerPort {
262 container_port: self.config.console_http_port.into(),
263 name: Some("http".into()),
264 protocol: Some("TCP".into()),
265 ..Default::default()
266 }];
267
268 let scheme = match console.spec.balancerd.scheme {
269 HttpConnectionScheme::Http => "http",
270 HttpConnectionScheme::Https => "https",
271 };
272 let mut env = vec![EnvVar {
273 name: "MZ_ENDPOINT".to_string(),
274 value: Some(format!(
275 "{}://{}.{}.svc.cluster.local:{}",
276 scheme,
277 console.spec.balancerd.service_name,
278 console.spec.balancerd.namespace,
279 self.config.balancerd_http_port,
280 )),
281 ..Default::default()
282 }];
283 let mut volumes = vec![Volume {
284 name: "app-config".to_string(),
285 config_map: Some(ConfigMapVolumeSource {
286 name: console.configmap_name(),
287 default_mode: Some(256),
288 optional: Some(false),
289 items: Some(vec![KeyToPath {
290 key: "app-config.json".to_string(),
291 path: "app-config.json".to_string(),
292 ..Default::default()
293 }]),
294 }),
295 ..Default::default()
296 }];
297 let mut volume_mounts = vec![VolumeMount {
298 name: "app-config".to_string(),
299 mount_path: "/usr/share/nginx/html/app-config".to_string(),
300 ..Default::default()
301 }];
302
303 let scheme = if issuer_ref_defined(
304 &self.config.default_certificate_specs.console_external,
305 &console.spec.external_certificate_spec,
306 ) {
307 volumes.push(Volume {
308 name: "external-certificate".to_owned(),
309 secret: Some(SecretVolumeSource {
310 default_mode: Some(0o400),
311 secret_name: Some(console.external_certificate_secret_name()),
312 items: None,
313 optional: Some(false),
314 }),
315 ..Default::default()
316 });
317 volume_mounts.push(VolumeMount {
318 name: "external-certificate".to_owned(),
319 mount_path: "/nginx/tls".to_owned(),
320 read_only: Some(true),
321 ..Default::default()
322 });
323 env.push(EnvVar {
324 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
325 value: Some(format!(
326 "listen {} ssl;
327ssl_certificate /nginx/tls/tls.crt;
328ssl_certificate_key /nginx/tls/tls.key;",
329 self.config.console_http_port
330 )),
331 ..Default::default()
332 });
333 Some("HTTPS".to_owned())
334 } else {
335 env.push(EnvVar {
336 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
337 value: Some(format!("listen {};", self.config.console_http_port)),
338 ..Default::default()
339 });
340 Some("HTTP".to_owned())
341 };
342
343 let probe = Probe {
344 http_get: Some(HTTPGetAction {
345 path: Some("/".to_string()),
346 port: IntOrString::Int(self.config.console_http_port.into()),
347 scheme,
348 ..Default::default()
349 }),
350 ..Default::default()
351 };
352
353 let security_context = if self.config.enable_security_context {
354 Some(SecurityContext {
358 run_as_non_root: Some(true),
359 capabilities: Some(Capabilities {
360 drop: Some(vec!["ALL".to_string()]),
361 ..Default::default()
362 }),
363 seccomp_profile: Some(SeccompProfile {
364 type_: "RuntimeDefault".to_string(),
365 ..Default::default()
366 }),
367 allow_privilege_escalation: Some(false),
368 ..Default::default()
369 })
370 } else {
371 None
372 };
373
374 let container = Container {
375 name: "console".to_owned(),
376 image: Some(console.spec.console_image_ref.clone()),
377 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
378 ports: Some(ports),
379 env: Some(env),
380 startup_probe: Some(Probe {
381 period_seconds: Some(1),
382 failure_threshold: Some(10),
383 ..probe.clone()
384 }),
385 readiness_probe: Some(Probe {
386 period_seconds: Some(30),
387 failure_threshold: Some(1),
388 ..probe.clone()
389 }),
390 liveness_probe: Some(Probe {
391 period_seconds: Some(30),
392 ..probe.clone()
393 }),
394 resources: console
395 .spec
396 .resource_requirements
397 .clone()
398 .or_else(|| self.config.console_default_resources.clone()),
399 security_context,
400 volume_mounts: Some(volume_mounts),
401 ..Default::default()
402 };
403
404 let deployment_spec = DeploymentSpec {
405 replicas: Some(console.replicas()),
406 selector: LabelSelector {
407 match_labels: Some(pod_template_labels.clone()),
408 ..Default::default()
409 },
410 template: PodTemplateSpec {
411 metadata: Some(ObjectMeta {
414 labels: Some(pod_template_labels),
415 ..Default::default()
416 }),
417 spec: Some(PodSpec {
418 containers: vec![container],
419 node_selector: Some(
420 self.config
421 .console_node_selector
422 .iter()
423 .map(|selector| (selector.key.clone(), selector.value.clone()))
424 .collect(),
425 ),
426 affinity: self.config.console_affinity.clone(),
427 tolerations: self.config.console_tolerations.clone(),
428 scheduler_name: self.config.scheduler_name.clone(),
429 volumes: Some(volumes),
430 security_context: Some(PodSecurityContext {
431 fs_group: Some(101),
432 ..Default::default()
433 }),
434 ..Default::default()
435 }),
436 },
437 ..Default::default()
438 };
439
440 Deployment {
441 metadata: ObjectMeta {
442 ..console.managed_resource_meta(console.deployment_name())
443 },
444 spec: Some(deployment_spec),
445 status: None,
446 }
447 }
448
449 fn create_console_service_object(&self, console: &Console) -> Service {
450 let selector =
451 btreemap! {"materialize.cloud/name".to_string() => console.deployment_name()};
452
453 let ports = vec![ServicePort {
454 name: Some("http".to_string()),
455 protocol: Some("TCP".to_string()),
456 port: self.config.console_http_port.into(),
457 target_port: Some(IntOrString::Int(self.config.console_http_port.into())),
458 ..Default::default()
459 }];
460
461 let spec = ServiceSpec {
462 type_: Some("ClusterIP".to_string()),
463 cluster_ip: Some("None".to_string()),
464 selector: Some(selector),
465 ports: Some(ports),
466 ..Default::default()
467 };
468
469 Service {
470 metadata: console.managed_resource_meta(console.service_name()),
471 spec: Some(spec),
472 status: None,
473 }
474 }
475}
476
477#[async_trait::async_trait]
478impl k8s_controller::Context for Context {
479 type Resource = Console;
480 type Error = Error;
481
482 #[instrument(fields())]
483 async fn apply(
484 &self,
485 client: Client,
486 console: &Self::Resource,
487 ) -> Result<Option<Action>, Self::Error> {
488 if console.status.is_none() {
489 let console_api: Api<Console> =
490 Api::namespaced(client.clone(), &console.meta().namespace.clone().unwrap());
491 let mut new_console = console.clone();
492 new_console.status = Some(console.status());
493 console_api
494 .replace_status(
495 &console.name_unchecked(),
496 &PostParams::default(),
497 serde_json::to_vec(&new_console).unwrap(),
498 )
499 .await?;
500 return Ok(None);
503 }
504
505 let namespace = console.namespace().unwrap();
506 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), &namespace);
507 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), &namespace);
508 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
509 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
510 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
511
512 trace!("creating new network policies");
513 let network_policies = self.create_network_policies(console);
514 for network_policy in &network_policies {
515 apply_resource(&network_policy_api, network_policy).await?;
516 }
517
518 trace!("creating new console configmap");
519 let console_configmap = self.create_console_app_configmap_object(console);
520 apply_resource(&configmap_api, &console_configmap).await?;
521
522 trace!("creating new console deployment");
523 let console_deployment = self.create_console_deployment_object(console);
524 apply_resource(&deployment_api, &console_deployment).await?;
525
526 trace!("creating new console service");
527 let console_service = self.create_console_service_object(console);
528 apply_resource(&service_api, &console_service).await?;
529
530 let console_external_certificate = self.create_console_external_certificate(console);
531 if let Some(certificate) = &console_external_certificate {
532 trace!("creating new console external certificate");
533 apply_resource(&certificate_api, certificate).await?;
534 }
535
536 self.sync_deployment_status(&client, console).await?;
537
538 Ok(None)
539 }
540}