1use k8s_openapi::{
11 api::{
12 apps::v1::{Deployment, DeploymentSpec},
13 core::v1::{
14 Capabilities, ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, EnvVar,
15 HTTPGetAction, KeyToPath, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
16 SeccompProfile, SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec,
17 Volume, VolumeMount,
18 },
19 networking::v1::{
20 IPBlock, NetworkPolicy, NetworkPolicyIngressRule, NetworkPolicyPeer, NetworkPolicyPort,
21 NetworkPolicySpec,
22 },
23 },
24 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
25};
26use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
27use maplit::btreemap;
28use mz_server_core::listeners::AuthenticatorKind;
29use serde::Serialize;
30use tracing::trace;
31
32use crate::{
33 k8s::{apply_resource, delete_resource},
34 tls::{create_certificate, issuer_ref_defined},
35};
36use mz_cloud_resources::crd::{
37 ManagedResource,
38 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
39 materialize::v1alpha1::Materialize,
40};
41
42pub struct Resources {
43 network_policies: Vec<NetworkPolicy>,
44 console_configmap: Box<ConfigMap>,
45 console_deployment: Box<Deployment>,
46 console_service: Box<Service>,
47 console_external_certificate: Box<Option<Certificate>>,
48}
49
50impl Resources {
51 pub fn new(config: &super::Config, mz: &Materialize, console_image_ref: &str) -> Self {
52 let network_policies = create_network_policies(config, mz);
53 let console_configmap =
54 Box::new(create_console_app_configmap_object(mz, console_image_ref));
55 let console_deployment = Box::new(create_console_deployment_object(
56 config,
57 mz,
58 console_image_ref,
59 ));
60 let console_service = Box::new(create_console_service_object(config, mz));
61 let console_external_certificate =
62 Box::new(create_console_external_certificate(config, mz));
63 Self {
64 network_policies,
65 console_configmap,
66 console_deployment,
67 console_service,
68 console_external_certificate,
69 }
70 }
71
72 pub async fn apply(
73 &self,
74 client: &Client,
75 namespace: &str,
76 ) -> Result<Option<Action>, anyhow::Error> {
77 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
78 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
79 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
80 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
81 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
82
83 trace!("creating new network policies");
84 for network_policy in &self.network_policies {
85 apply_resource(&network_policy_api, network_policy).await?;
86 }
87
88 trace!("creating new console configmap");
89 apply_resource(&configmap_api, &self.console_configmap).await?;
90
91 trace!("creating new console deployment");
92 apply_resource(&deployment_api, &self.console_deployment).await?;
93
94 trace!("creating new console service");
95 apply_resource(&service_api, &self.console_service).await?;
96
97 if let Some(certificate) = &*self.console_external_certificate {
98 trace!("creating new console external certificate");
99 apply_resource(&certificate_api, certificate).await?;
100 }
101
102 Ok(None)
103 }
104
105 pub async fn cleanup(
106 &self,
107 client: &Client,
108 namespace: &str,
109 ) -> Result<Option<Action>, anyhow::Error> {
110 let network_policy_api: Api<NetworkPolicy> = Api::namespaced(client.clone(), namespace);
111 let configmap_api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
112 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
113 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
114 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
115
116 if let Some(certificate) = &*self.console_external_certificate {
117 trace!("deleting console external certificate");
118 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
119 }
120
121 trace!("deleting console service");
122 delete_resource(&service_api, &self.console_service.name_unchecked()).await?;
123
124 trace!("deleting console deployment");
125 delete_resource(&deployment_api, &self.console_deployment.name_unchecked()).await?;
126
127 trace!("deleting console configmap");
128 delete_resource(&configmap_api, &self.console_configmap.name_unchecked()).await?;
129
130 trace!("deleting network policies");
131 for network_policy in &self.network_policies {
132 delete_resource(&network_policy_api, &network_policy.name_unchecked()).await?;
133 }
134
135 Ok(None)
136 }
137}
138
139fn create_network_policies(config: &super::Config, mz: &Materialize) -> Vec<NetworkPolicy> {
140 let mut network_policies = Vec::new();
141 if config.network_policies_ingress_enabled {
142 let console_label_selector = LabelSelector {
143 match_labels: Some(
144 mz.default_labels()
145 .into_iter()
146 .chain([("materialize.cloud/app".to_owned(), mz.console_app_name())])
147 .collect(),
148 ),
149 ..Default::default()
150 };
151 network_policies.extend([NetworkPolicy {
152 metadata: mz.managed_resource_meta(mz.name_prefixed("console-ingress")),
153 spec: Some(NetworkPolicySpec {
154 ingress: Some(vec![NetworkPolicyIngressRule {
155 from: Some(
156 config
157 .network_policies_ingress_cidrs
158 .iter()
159 .map(|cidr| NetworkPolicyPeer {
160 ip_block: Some(IPBlock {
161 cidr: cidr.to_owned(),
162 except: None,
163 }),
164 ..Default::default()
165 })
166 .collect(),
167 ),
168 ports: Some(vec![NetworkPolicyPort {
169 port: Some(IntOrString::Int(config.console_http_port.into())),
170 protocol: Some("TCP".to_string()),
171 ..Default::default()
172 }]),
173 ..Default::default()
174 }]),
175 pod_selector: Some(console_label_selector),
176 policy_types: Some(vec!["Ingress".to_owned()]),
177 ..Default::default()
178 }),
179 }]);
180 }
181 network_policies
182}
183
184fn create_console_external_certificate(
185 config: &super::Config,
186 mz: &Materialize,
187) -> Option<Certificate> {
188 create_certificate(
189 config.default_certificate_specs.console_external.clone(),
190 mz,
191 mz.spec.console_external_certificate_spec.clone(),
192 mz.console_external_certificate_name(),
193 mz.console_external_certificate_secret_name(),
194 None,
195 CertificatePrivateKeyAlgorithm::Rsa,
196 Some(4096),
197 )
198}
199
200#[derive(Serialize)]
201struct ConsoleAppConfig {
202 version: String,
203 auth: ConsoleAppConfigAuth,
204}
205
206#[derive(Serialize)]
207struct ConsoleAppConfigAuth {
208 mode: AuthenticatorKind,
209}
210
211fn create_console_app_configmap_object(mz: &Materialize, console_image_ref: &str) -> ConfigMap {
212 let version: String = console_image_ref
213 .rsplitn(2, ':')
214 .next()
215 .expect("at least one chunk, even if empty")
216 .to_owned();
217 let app_config_json = serde_json::to_string(&ConsoleAppConfig {
218 version,
219 auth: ConsoleAppConfigAuth {
220 mode: mz.spec.authenticator_kind,
221 },
222 })
223 .expect("known valid");
224 ConfigMap {
225 binary_data: None,
226 data: Some(btreemap! {
227 "app-config.json".to_owned() => app_config_json,
228 }),
229 immutable: None,
230 metadata: mz.managed_resource_meta(mz.console_configmap_name()),
231 }
232}
233
234fn create_console_deployment_object(
235 config: &super::Config,
236 mz: &Materialize,
237 console_image_ref: &str,
238) -> Deployment {
239 let mut pod_template_labels = mz.default_labels();
240 pod_template_labels.insert(
241 "materialize.cloud/name".to_owned(),
242 mz.console_deployment_name(),
243 );
244 pod_template_labels.insert("app".to_owned(), "console".to_string());
245 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.console_app_name());
246
247 let ports = vec![ContainerPort {
248 container_port: config.console_http_port.into(),
249 name: Some("http".into()),
250 protocol: Some("TCP".into()),
251 ..Default::default()
252 }];
253
254 let scheme = if issuer_ref_defined(
255 &config.default_certificate_specs.balancerd_external,
256 &mz.spec.balancerd_external_certificate_spec,
257 ) {
258 "https"
259 } else {
260 "http"
261 };
262 let mut env = vec![EnvVar {
263 name: "MZ_ENDPOINT".to_string(),
264 value: Some(format!(
265 "{}://{}.{}.svc.cluster.local:{}",
266 scheme,
267 mz.balancerd_service_name(),
268 mz.namespace(),
269 config.balancerd_http_port,
270 )),
271 ..Default::default()
272 }];
273 let mut volumes = vec![Volume {
274 name: "app-config".to_string(),
275 config_map: Some(ConfigMapVolumeSource {
276 name: mz.console_configmap_name(),
277 default_mode: Some(256),
278 optional: Some(false),
279 items: Some(vec![KeyToPath {
280 key: "app-config.json".to_string(),
281 path: "app-config.json".to_string(),
282 ..Default::default()
283 }]),
284 }),
285 ..Default::default()
286 }];
287 let mut volume_mounts = vec![VolumeMount {
288 name: "app-config".to_string(),
289 mount_path: "/usr/share/nginx/html/app-config".to_string(),
290 ..Default::default()
291 }];
292
293 let scheme = if issuer_ref_defined(
294 &config.default_certificate_specs.console_external,
295 &mz.spec.console_external_certificate_spec,
296 ) {
297 volumes.push(Volume {
298 name: "external-certificate".to_owned(),
299 secret: Some(SecretVolumeSource {
300 default_mode: Some(0o400),
301 secret_name: Some(mz.console_external_certificate_secret_name()),
302 items: None,
303 optional: Some(false),
304 }),
305 ..Default::default()
306 });
307 volume_mounts.push(VolumeMount {
308 name: "external-certificate".to_owned(),
309 mount_path: "/nginx/tls".to_owned(),
310 read_only: Some(true),
311 ..Default::default()
312 });
313 env.push(EnvVar {
314 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
315 value: Some(format!(
316 "listen {} ssl;
317ssl_certificate /nginx/tls/tls.crt;
318ssl_certificate_key /nginx/tls/tls.key;",
319 config.console_http_port
320 )),
321 ..Default::default()
322 });
323 Some("HTTPS".to_owned())
324 } else {
325 env.push(EnvVar {
326 name: "MZ_NGINX_LISTENER_CONFIG".to_string(),
327 value: Some(format!("listen {};", config.console_http_port)),
328 ..Default::default()
329 });
330 Some("HTTP".to_owned())
331 };
332
333 let probe = Probe {
334 http_get: Some(HTTPGetAction {
335 path: Some("/".to_string()),
336 port: IntOrString::Int(config.console_http_port.into()),
337 scheme,
338 ..Default::default()
339 }),
340 ..Default::default()
341 };
342
343 let security_context = if config.enable_security_context {
344 Some(SecurityContext {
348 run_as_non_root: Some(true),
349 capabilities: Some(Capabilities {
350 drop: Some(vec!["ALL".to_string()]),
351 ..Default::default()
352 }),
353 seccomp_profile: Some(SeccompProfile {
354 type_: "RuntimeDefault".to_string(),
355 ..Default::default()
356 }),
357 allow_privilege_escalation: Some(false),
358 ..Default::default()
359 })
360 } else {
361 None
362 };
363
364 let container = Container {
365 name: "console".to_owned(),
366 image: Some(console_image_ref.to_string()),
367 image_pull_policy: Some(config.image_pull_policy.to_string()),
368 ports: Some(ports),
369 env: Some(env),
370 startup_probe: Some(Probe {
371 period_seconds: Some(1),
372 failure_threshold: Some(10),
373 ..probe.clone()
374 }),
375 readiness_probe: Some(Probe {
376 period_seconds: Some(30),
377 failure_threshold: Some(1),
378 ..probe.clone()
379 }),
380 liveness_probe: Some(Probe {
381 period_seconds: Some(30),
382 ..probe.clone()
383 }),
384 resources: mz
385 .spec
386 .console_resource_requirements
387 .clone()
388 .or_else(|| config.console_default_resources.clone()),
389 security_context,
390 volume_mounts: Some(volume_mounts),
391 ..Default::default()
392 };
393
394 let deployment_spec = DeploymentSpec {
395 replicas: Some(mz.console_replicas()),
396 selector: LabelSelector {
397 match_labels: Some(pod_template_labels.clone()),
398 ..Default::default()
399 },
400 template: PodTemplateSpec {
401 metadata: Some(ObjectMeta {
404 labels: Some(pod_template_labels),
405 ..Default::default()
406 }),
407 spec: Some(PodSpec {
408 containers: vec![container],
409 node_selector: Some(
410 config
411 .console_node_selector
412 .iter()
413 .map(|selector| (selector.key.clone(), selector.value.clone()))
414 .collect(),
415 ),
416 affinity: config.console_affinity.clone(),
417 tolerations: config.console_tolerations.clone(),
418 scheduler_name: config.scheduler_name.clone(),
419 service_account_name: Some(mz.service_account_name()),
420 volumes: Some(volumes),
421 security_context: Some(PodSecurityContext {
422 fs_group: Some(101),
423 ..Default::default()
424 }),
425 ..Default::default()
426 }),
427 },
428 ..Default::default()
429 };
430
431 Deployment {
432 metadata: ObjectMeta {
433 ..mz.managed_resource_meta(mz.console_deployment_name())
434 },
435 spec: Some(deployment_spec),
436 status: None,
437 }
438}
439
440fn create_console_service_object(config: &super::Config, mz: &Materialize) -> Service {
441 let selector = btreemap! {"materialize.cloud/name".to_string() => mz.console_deployment_name()};
442
443 let ports = vec![ServicePort {
444 name: Some("http".to_string()),
445 protocol: Some("TCP".to_string()),
446 port: config.console_http_port.into(),
447 target_port: Some(IntOrString::Int(config.console_http_port.into())),
448 ..Default::default()
449 }];
450
451 let spec = ServiceSpec {
452 type_: Some("ClusterIP".to_string()),
453 cluster_ip: Some("None".to_string()),
454 selector: Some(selector),
455 ports: Some(ports),
456 ..Default::default()
457 };
458
459 Service {
460 metadata: mz.managed_resource_meta(mz.console_service_name()),
461 spec: Some(spec),
462 status: None,
463 }
464}