1use k8s_openapi::{
11 api::{
12 apps::v1::{Deployment, DeploymentSpec},
13 core::v1::{
14 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15 HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16 SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17 Volume, VolumeMount,
18 },
19 networking::v1::{
20 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21 NetworkPolicySpec,
22 },
23 },
24 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33 controller::materialize::tls::{create_certificate, issuer_ref_defined},
34 k8s::{apply_resource, delete_resource},
35};
36use mz_cloud_resources::crd::{
37 generated::cert_manager::certificates::Certificate, materialize::v1alpha1::Materialize,
38};
39
40pub struct Resources {
41 network_policies: Vec<NetworkPolicy>,
42 console_configmap: Box<ConfigMap>,
43 console_deployment: Box<Deployment>,
44 console_service: Box<Service>,
45 console_external_certificate: Box<Option<Certificate>>,
46}
47
48impl Resources {
49 pub fn new(
50 config: &super::MaterializeControllerArgs,
51 mz: &Materialize,
52 console_image_ref: &str,
53 ) -> Self {
54 let network_policies = create_network_policies(config, mz);
55 let console_configmap =
56 Box::new(create_console_app_configmap_object(mz, console_image_ref));
57 let console_deployment = Box::new(create_console_deployment_object(
58 config,
59 mz,
60 console_image_ref,
61 ));
62 let console_service = Box::new(create_console_service_object(config, mz));
63 let console_external_certificate =
64 Box::new(create_console_external_certificate(config, mz));
65 Self {
66 network_policies,
67 console_configmap,
68 console_deployment,
69 console_service,
70 console_external_certificate,
71 }
72 }
73
74 pub async fn apply(
75 &self,
76 client: &Client,
77 namespace: &str,
78 ) -> Result<Option<Action>, anyhow::Error> {
79 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
80 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
81 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
82 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
83 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
84
85 trace!("creating new network policies");
86 for network_policy in &self.network_policies {
87 apply_resource(&network_policy_api, network_policy).await?;
88 }
89
90 trace!("creating new console configmap");
91 apply_resource(&configmap_api, &self.console_configmap).await?;
92
93 trace!("creating new console deployment");
94 apply_resource(&deployment_api, &self.console_deployment).await?;
95
96 trace!("creating new console service");
97 apply_resource(&service_api, &self.console_service).await?;
98
99 if let Some(certificate) = &*self.console_external_certificate {
100 trace!("creating new console external certificate");
101 apply_resource(&certificate_api, certificate).await?;
102 }
103
104 Ok(None)
105 }
106
107 pub async fn cleanup(
108 &self,
109 client: &Client,
110 namespace: &str,
111 ) -> Result<Option<Action>, anyhow::Error> {
112 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
113 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
114 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
115 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
116 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
117
118 if let Some(certificate) = &*self.console_external_certificate {
119 trace!("deleting console external certificate");
120 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
121 }
122
123 trace!("deleting console service");
124 delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
125
126 trace!("deleting console deployment");
127 delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
128
129 trace!("deleting console configmap");
130 delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
131
132 trace!("deleting network policies");
133 for network_policy in &self.network_policies {
134 delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
135 }
136
137 Ok(None)
138 }
139}
140
141fn create_network_policies(
142 config: &super::MaterializeControllerArgs,
143 mz: &Materialize,
144) -> Vec<NetworkPolicy> {
145 let mut network_policies = Vec::new();
146 if config.network_policies.ingress_enabled {
147 let console_label_selector = LabelSelector {
148 match_labels: Some(
149 mz.default_labels()
150 .into_iter()
151 .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
152 .collect(),
153 ),
154 ..Default::default()
155 };
156 network_policies.extend([NetworkPolicy {
157 metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
158 spec: Some(NetworkPolicySpec {
159 ingress: Some(vec![NetworkPolicyIngressRule {
160 from: Some(
161 config
162 .network_policies
163 .ingress_cidrs
164 .iter()
165 .map(|cidr| NetworkPolicyPeer {
166 ip_block: Some(IPBlock {
167 cidr: cidr.to_owned(),
168 except: None,
169 }),
170 ..Default::default()
171 })
172 .collect(),
173 ),
174 ports: Some(vec![NetworkPolicyPort {
175 port: Some(IntOrString::Int(config.console_http_port.into())),
176 protocol: Some("TCP".to_string()),
177 ..Default::default()
178 }]),
179 ..Default::default()
180 }]),
181 pod_selector: console_label_selector,
182 policy_types: Some(vec!["Ingress".to_owned()]),
183 ..Default::default()
184 }),
185 }]);
186 }
187 network_policies
188}
189
190fn create_console_external_certificate(
191 config: &super::MaterializeControllerArgs,
192 mz: &Materialize,
193) -> Option<Certificate> {
194 create_certificate(
195 config.default_certificate_specs.console_external.clone(),
196 mz,
197 mz.spec.console_external_certificate_spec.clone(),
198 mz.console_external_certificate_name(),
199 mz.console_external_certificate_secret_name(),
200 None,
201 )
202}
203
204#[derive(Serialize)]
205struct ConsoleAppConfig {
206 version: String,
207 auth: ConsoleAppConfigAuth,
208}
209
210#[derive(Serialize)]
211struct ConsoleAppConfigAuth {
212 mode: AuthenticatorKind,
213}
214
215fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
216 let version: String = console_image_ref
217 .rsplitn(2, ':')
218 .next()
219 .expect("at least one chunk, even if empty")
220 .to_owned();
221 let app_config_json = serde_json::to_string(&ConsoleAppConfig {
222 version,
223 auth: ConsoleAppConfigAuth {
224 mode: mz.spec.authenticator_kind,
225 },
226 })
227 .expect("known valid");
228 ConfigMap {
229 binary_data: None,
230 data: Some(btreemap! {
231 "app-config.json".to_owned() => app_config_json,
232 }),
233 immutable: None,
234 metadata: mz.managed_resource_meta(mz.console_configmap_name()),
235 }
236}
237
238fn create_console_deployment_object(
239 config: &super::MaterializeControllerArgs,
240 mz: &Materialize,
241 console_image_ref: &str,
242) -> Deployment {
243 let mut pod_template_labels = mz.default_labels();
244 pod_template_labels.insert(
245 "materialize.cloud/name".to_owned(),
246 mz.console_deployment_name(),
247 );
248 pod_template_labels.insert("app".to_owned(), "console".to_string());
249 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
250
251 let ports = vec![ContainerPort {
252 container_port: config.console_http_port.into(),
253 name: Some("http".into()),
254 protocol: Some("TCP".into()),
255 ..Default::default()
256 }];
257
258 let scheme = if issuer_ref_defined(
259 &config.default_certificate_specs.balancerd_external,
260 &mz.spec.balancerd_external_certificate_spec,
261 ) {
262 "https"
263 } else {
264 "http"
265 };
266 let mut env = vec![EnvVar {
267 name: "MZ_ENDPOINT".to_string(),
268 value: Some(format!(
269 "{}://{}.{}.svc.cluster.local:{}",
270 scheme,
271 mz.balancerd_service_name(),
272 mz.namespace(),
273 config.balancerd_http_port,
274 )),
275 ..Default::default()
276 }];
277 let mut volumes = vec![Volume {
278 name: "app-config".to_string(),
279 config_map: Some(ConfigMapVolumeSource {
280 name: mz.console_configmap_name(),
281 default_mode: Some(256),
282 optional: Some(false),
283 items: Some(vec![KeyToPath {
284 key: "app-config.json".to_string(),
285 path: "app-config.json".to_string(),
286 ..Default::default()
287 }]),
288 }),
289 ..Default::default()
290 }];
291 let mut volume_mounts = vec![VolumeMount {
292 name: "app-config".to_string(),
293 mount_path: "/usr/share/nginx/html/app-config".to_string(),
294 ..Default::default()
295 }];
296
297 let scheme = if issuer_ref_defined(
298 &config.default_certificate_specs.console_external,
299 &mz.spec.console_external_certificate_spec,
300 ) {
301 volumes.push(Volume {
302 name: "external-certificate".to_owned(),
303 secret: Some(SecretVolumeSource {
304 default_mode: Some(0o400),
305 secret_name: Some(mz.console_external_certificate_secret_name()),
306 items: None,
307 optional: Some(false),
308 }),
309 ..Default::default()
310 });
311 volume_mounts.push(VolumeMount {
312 name: "external-certificate".to_owned(),
313 mount_path: "/nginx/tls".to_owned(),
314 read_only: Some(true),
315 ..Default::default()
316 });
317 env.push(EnvVar {
318 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
319 value: Some(format!(
320 "listen {} ssl;
321ssl_certificate /nginx/tls/tls.crt;
322ssl_certificate_key /nginx/tls/tls.key;",
323 config.console_http_port
324 )),
325 ..Default::default()
326 });
327 Some("HTTPS".to_owned())
328 } else {
329 env.push(EnvVar {
330 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
331 value: Some(format!("listen {};", config.console_http_port)),
332 ..Default::default()
333 });
334 Some("HTTP".to_owned())
335 };
336
337 let probe = Probe {
338 http_get: Some(HTTPGetAction {
339 path: Some("/".to_string()),
340 port: IntOrString::Int(config.console_http_port.into()),
341 scheme,
342 ..Default::default()
343 }),
344 ..Default::default()
345 };
346
347 let security_context = if config.enable_security_context {
348 Some(SecurityContext {
352 run_as_non_root: Some(true),
353 capabilities: Some(Capabilities {
354 drop: Some(vec!["ALL".to_string()]),
355 ..Default::default()
356 }),
357 seccomp_profile: Some(SeccompProfile {
358 type_: "RuntimeDefault".to_string(),
359 ..Default::default()
360 }),
361 allow_privilege_escalation: Some(false),
362 ..Default::default()
363 })
364 } else {
365 None
366 };
367
368 let container = Container {
369 name: "console".to_owned(),
370 image: Some(console_image_ref.to_string()),
371 image_pull_policy: Some(config.image_pull_policy.to_string()),
372 ports: Some(ports),
373 env: Some(env),
374 startup_probe: Some(Probe {
375 period_seconds: Some(1),
376 failure_threshold: Some(10),
377 ..probe.clone()
378 }),
379 readiness_probe: Some(Probe {
380 period_seconds: Some(30),
381 failure_threshold: Some(1),
382 ..probe.clone()
383 }),
384 liveness_probe: Some(Probe {
385 period_seconds: Some(30),
386 ..probe.clone()
387 }),
388 resources: mz.spec.console_resource_requirements.clone(),
389 security_context,
390 volume_mounts: Some(volume_mounts),
391 ..Default::default()
392 };
393
394 let deployment_spec = DeploymentSpec {
395 replicas: Some(2),
396 selector: LabelSelector {
397 match_labels: Some(pod_template_labels.clone()),
398 ..Default::default()
399 },
400 template: PodTemplateSpec {
401 metadata: Some(ObjectMeta {
404 labels: Some(pod_template_labels),
405 ..Default::default()
406 }),
407 spec: Some(PodSpec {
408 containers: vec![container],
409 node_selector: Some(
410 config
411 .console_node_selector
412 .iter()
413 .map(|selector| (selector.key.clone(), selector.value.clone()))
414 .collect(),
415 ),
416 affinity: config.console_affinity.clone(),
417 tolerations: config.console_tolerations.clone(),
418 scheduler_name: config.scheduler_name.clone(),
419 service_account_name: Some(mz.service_account_name()),
420 volumes: Some(volumes),
421 security_context: Some(PodSecurityContext {
422 fs_group: Some(101),
423 ..Default::default()
424 }),
425 ..Default::default()
426 }),
427 },
428 ..Default::default()
429 };
430
431 Deployment {
432 metadata: ObjectMeta {
433 ..mz.managed_resource_meta(mz.console_deployment_name())
434 },
435 spec: Some(deployment_spec),
436 status: None,
437 }
438}
439
440fn create_console_service_object(
441 config: &super::MaterializeControllerArgs,
442 mz: &Materialize,
443) -> Service {
444 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
445
446 let ports = vec![ServicePort {
447 name: Some("http".to_string()),
448 protocol: Some("TCP".to_string()),
449 port: config.console_http_port.into(),
450 target_port: Some(IntOrString::Int(config.console_http_port.into())),
451 ..Default::default()
452 }];
453
454 let spec = ServiceSpec {
455 type_: Some("ClusterIP".to_string()),
456 cluster_ip: Some("None".to_string()),
457 selector: Some(selector),
458 ports: Some(ports),
459 ..Default::default()
460 };
461
462 Service {
463 metadata: mz.managed_resource_meta(mz.console_service_name()),
464 spec: Some(spec),
465 status: None,
466 }
467}