1use k8s_openapi::{
11 api::{
12 apps::v1::{Deployment, DeploymentSpec},
13 core::v1::{
14 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15 HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16 SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17 Volume, VolumeMount,
18 },
19 networking::v1::{
20 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21 NetworkPolicySpec,
22 },
23 },
24 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33 controller::materialize::tls::{create_certificate, issuer_ref_defined},
34 k8s::{apply_resource, delete_resource},
35};
36use mz_cloud_resources::crd::{
37 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
38 materialize::v1alpha1::Materialize,
39};
40
41pub struct Resources {
42 network_policies: Vec<NetworkPolicy>,
43 console_configmap: Box<ConfigMap>,
44 console_deployment: Box<Deployment>,
45 console_service: Box<Service>,
46 console_external_certificate: Box<Option<Certificate>>,
47}
48
49impl Resources {
50 pub fn new(
51 config: &super::MaterializeControllerArgs,
52 mz: &Materialize,
53 console_image_ref: &str,
54 ) -> Self {
55 let network_policies = create_network_policies(config, mz);
56 let console_configmap =
57 Box::new(create_console_app_configmap_object(mz, console_image_ref));
58 let console_deployment = Box::new(create_console_deployment_object(
59 config,
60 mz,
61 console_image_ref,
62 ));
63 let console_service = Box::new(create_console_service_object(config, mz));
64 let console_external_certificate =
65 Box::new(create_console_external_certificate(config, mz));
66 Self {
67 network_policies,
68 console_configmap,
69 console_deployment,
70 console_service,
71 console_external_certificate,
72 }
73 }
74
75 pub async fn apply(
76 &self,
77 client: &Client,
78 namespace: &str,
79 ) -> Result<Option<Action>, anyhow::Error> {
80 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
81 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
82 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
83 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
84 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
85
86 trace!("creating new network policies");
87 for network_policy in &self.network_policies {
88 apply_resource(&network_policy_api, network_policy).await?;
89 }
90
91 trace!("creating new console configmap");
92 apply_resource(&configmap_api, &self.console_configmap).await?;
93
94 trace!("creating new console deployment");
95 apply_resource(&deployment_api, &self.console_deployment).await?;
96
97 trace!("creating new console service");
98 apply_resource(&service_api, &self.console_service).await?;
99
100 if let Some(certificate) = &*self.console_external_certificate {
101 trace!("creating new console external certificate");
102 apply_resource(&certificate_api, certificate).await?;
103 }
104
105 Ok(None)
106 }
107
108 pub async fn cleanup(
109 &self,
110 client: &Client,
111 namespace: &str,
112 ) -> Result<Option<Action>, anyhow::Error> {
113 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
114 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
115 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
116 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
117 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
118
119 if let Some(certificate) = &*self.console_external_certificate {
120 trace!("deleting console external certificate");
121 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
122 }
123
124 trace!("deleting console service");
125 delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
126
127 trace!("deleting console deployment");
128 delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
129
130 trace!("deleting console configmap");
131 delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
132
133 trace!("deleting network policies");
134 for network_policy in &self.network_policies {
135 delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
136 }
137
138 Ok(None)
139 }
140}
141
142fn create_network_policies(
143 config: &super::MaterializeControllerArgs,
144 mz: &Materialize,
145) -> Vec<NetworkPolicy> {
146 let mut network_policies = Vec::new();
147 if config.network_policies.ingress_enabled {
148 let console_label_selector = LabelSelector {
149 match_labels: Some(
150 mz.default_labels()
151 .into_iter()
152 .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
153 .collect(),
154 ),
155 ..Default::default()
156 };
157 network_policies.extend([NetworkPolicy {
158 metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
159 spec: Some(NetworkPolicySpec {
160 ingress: Some(vec![NetworkPolicyIngressRule {
161 from: Some(
162 config
163 .network_policies
164 .ingress_cidrs
165 .iter()
166 .map(|cidr| NetworkPolicyPeer {
167 ip_block: Some(IPBlock {
168 cidr: cidr.to_owned(),
169 except: None,
170 }),
171 ..Default::default()
172 })
173 .collect(),
174 ),
175 ports: Some(vec![NetworkPolicyPort {
176 port: Some(IntOrString::Int(config.console_http_port.into())),
177 protocol: Some("TCP".to_string()),
178 ..Default::default()
179 }]),
180 ..Default::default()
181 }]),
182 pod_selector: Some(console_label_selector),
183 policy_types: Some(vec!["Ingress".to_owned()]),
184 ..Default::default()
185 }),
186 }]);
187 }
188 network_policies
189}
190
191fn create_console_external_certificate(
192 config: &super::MaterializeControllerArgs,
193 mz: &Materialize,
194) -> Option<Certificate> {
195 create_certificate(
196 config.default_certificate_specs.console_external.clone(),
197 mz,
198 mz.spec.console_external_certificate_spec.clone(),
199 mz.console_external_certificate_name(),
200 mz.console_external_certificate_secret_name(),
201 None,
202 CertificatePrivateKeyAlgorithm::Rsa,
203 Some(4096),
204 )
205}
206
207#[derive(Serialize)]
208struct ConsoleAppConfig {
209 version: String,
210 auth: ConsoleAppConfigAuth,
211}
212
213#[derive(Serialize)]
214struct ConsoleAppConfigAuth {
215 mode: AuthenticatorKind,
216}
217
218fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
219 let version: String = console_image_ref
220 .rsplitn(2, ':')
221 .next()
222 .expect("at least one chunk, even if empty")
223 .to_owned();
224 let app_config_json = serde_json::to_string(&ConsoleAppConfig {
225 version,
226 auth: ConsoleAppConfigAuth {
227 mode: mz.spec.authenticator_kind,
228 },
229 })
230 .expect("known valid");
231 ConfigMap {
232 binary_data: None,
233 data: Some(btreemap! {
234 "app-config.json".to_owned() => app_config_json,
235 }),
236 immutable: None,
237 metadata: mz.managed_resource_meta(mz.console_configmap_name()),
238 }
239}
240
241fn create_console_deployment_object(
242 config: &super::MaterializeControllerArgs,
243 mz: &Materialize,
244 console_image_ref: &str,
245) -> Deployment {
246 let mut pod_template_labels = mz.default_labels();
247 pod_template_labels.insert(
248 "materialize.cloud/name".to_owned(),
249 mz.console_deployment_name(),
250 );
251 pod_template_labels.insert("app".to_owned(), "console".to_string());
252 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
253
254 let ports = vec![ContainerPort {
255 container_port: config.console_http_port.into(),
256 name: Some("http".into()),
257 protocol: Some("TCP".into()),
258 ..Default::default()
259 }];
260
261 let scheme = if issuer_ref_defined(
262 &config.default_certificate_specs.balancerd_external,
263 &mz.spec.balancerd_external_certificate_spec,
264 ) {
265 "https"
266 } else {
267 "http"
268 };
269 let mut env = vec![EnvVar {
270 name: "MZ_ENDPOINT".to_string(),
271 value: Some(format!(
272 "{}://{}.{}.svc.cluster.local:{}",
273 scheme,
274 mz.balancerd_service_name(),
275 mz.namespace(),
276 config.balancerd_http_port,
277 )),
278 ..Default::default()
279 }];
280 let mut volumes = vec![Volume {
281 name: "app-config".to_string(),
282 config_map: Some(ConfigMapVolumeSource {
283 name: mz.console_configmap_name(),
284 default_mode: Some(256),
285 optional: Some(false),
286 items: Some(vec![KeyToPath {
287 key: "app-config.json".to_string(),
288 path: "app-config.json".to_string(),
289 ..Default::default()
290 }]),
291 }),
292 ..Default::default()
293 }];
294 let mut volume_mounts = vec![VolumeMount {
295 name: "app-config".to_string(),
296 mount_path: "/usr/share/nginx/html/app-config".to_string(),
297 ..Default::default()
298 }];
299
300 let scheme = if issuer_ref_defined(
301 &config.default_certificate_specs.console_external,
302 &mz.spec.console_external_certificate_spec,
303 ) {
304 volumes.push(Volume {
305 name: "external-certificate".to_owned(),
306 secret: Some(SecretVolumeSource {
307 default_mode: Some(0o400),
308 secret_name: Some(mz.console_external_certificate_secret_name()),
309 items: None,
310 optional: Some(false),
311 }),
312 ..Default::default()
313 });
314 volume_mounts.push(VolumeMount {
315 name: "external-certificate".to_owned(),
316 mount_path: "/nginx/tls".to_owned(),
317 read_only: Some(true),
318 ..Default::default()
319 });
320 env.push(EnvVar {
321 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
322 value: Some(format!(
323 "listen {} ssl;
324ssl_certificate /nginx/tls/tls.crt;
325ssl_certificate_key /nginx/tls/tls.key;",
326 config.console_http_port
327 )),
328 ..Default::default()
329 });
330 Some("HTTPS".to_owned())
331 } else {
332 env.push(EnvVar {
333 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
334 value: Some(format!("listen {};", config.console_http_port)),
335 ..Default::default()
336 });
337 Some("HTTP".to_owned())
338 };
339
340 let probe = Probe {
341 http_get: Some(HTTPGetAction {
342 path: Some("/".to_string()),
343 port: IntOrString::Int(config.console_http_port.into()),
344 scheme,
345 ..Default::default()
346 }),
347 ..Default::default()
348 };
349
350 let security_context = if config.enable_security_context {
351 Some(SecurityContext {
355 run_as_non_root: Some(true),
356 capabilities: Some(Capabilities {
357 drop: Some(vec!["ALL".to_string()]),
358 ..Default::default()
359 }),
360 seccomp_profile: Some(SeccompProfile {
361 type_: "RuntimeDefault".to_string(),
362 ..Default::default()
363 }),
364 allow_privilege_escalation: Some(false),
365 ..Default::default()
366 })
367 } else {
368 None
369 };
370
371 let container = Container {
372 name: "console".to_owned(),
373 image: Some(console_image_ref.to_string()),
374 image_pull_policy: Some(config.image_pull_policy.to_string()),
375 ports: Some(ports),
376 env: Some(env),
377 startup_probe: Some(Probe {
378 period_seconds: Some(1),
379 failure_threshold: Some(10),
380 ..probe.clone()
381 }),
382 readiness_probe: Some(Probe {
383 period_seconds: Some(30),
384 failure_threshold: Some(1),
385 ..probe.clone()
386 }),
387 liveness_probe: Some(Probe {
388 period_seconds: Some(30),
389 ..probe.clone()
390 }),
391 resources: mz
392 .spec
393 .console_resource_requirements
394 .clone()
395 .or_else(|| config.console_default_resources.clone()),
396 security_context,
397 volume_mounts: Some(volume_mounts),
398 ..Default::default()
399 };
400
401 let deployment_spec = DeploymentSpec {
402 replicas: Some(mz.console_replicas()),
403 selector: LabelSelector {
404 match_labels: Some(pod_template_labels.clone()),
405 ..Default::default()
406 },
407 template: PodTemplateSpec {
408 metadata: Some(ObjectMeta {
411 labels: Some(pod_template_labels),
412 ..Default::default()
413 }),
414 spec: Some(PodSpec {
415 containers: vec![container],
416 node_selector: Some(
417 config
418 .console_node_selector
419 .iter()
420 .map(|selector| (selector.key.clone(), selector.value.clone()))
421 .collect(),
422 ),
423 affinity: config.console_affinity.clone(),
424 tolerations: config.console_tolerations.clone(),
425 scheduler_name: config.scheduler_name.clone(),
426 service_account_name: Some(mz.service_account_name()),
427 volumes: Some(volumes),
428 security_context: Some(PodSecurityContext {
429 fs_group: Some(101),
430 ..Default::default()
431 }),
432 ..Default::default()
433 }),
434 },
435 ..Default::default()
436 };
437
438 Deployment {
439 metadata: ObjectMeta {
440 ..mz.managed_resource_meta(mz.console_deployment_name())
441 },
442 spec: Some(deployment_spec),
443 status: None,
444 }
445}
446
447fn create_console_service_object(
448 config: &super::MaterializeControllerArgs,
449 mz: &Materialize,
450) -> Service {
451 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
452
453 let ports = vec![ServicePort {
454 name: Some("http".to_string()),
455 protocol: Some("TCP".to_string()),
456 port: config.console_http_port.into(),
457 target_port: Some(IntOrString::Int(config.console_http_port.into())),
458 ..Default::default()
459 }];
460
461 let spec = ServiceSpec {
462 type_: Some("ClusterIP".to_string()),
463 cluster_ip: Some("None".to_string()),
464 selector: Some(selector),
465 ports: Some(ports),
466 ..Default::default()
467 };
468
469 Service {
470 metadata: mz.managed_resource_meta(mz.console_service_name()),
471 spec: Some(spec),
472 status: None,
473 }
474}