1use std::collections::BTreeMap;
11
12use k8s_openapi::{
13 api::{
14 core::v1::ServiceAccount,
15 networking::v1::{
16 IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
17 NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
18 },
19 rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
20 },
21 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use serde::Serialize;
26use tracing::{trace, warn};
27
28use super::Error;
29use crate::k8s::apply_resource;
30use crate::tls::create_certificate;
31use mz_cloud_provider::CloudProvider;
32use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
33use mz_cloud_resources::crd::{
34 ManagedResource,
35 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36};
37use mz_ore::instrument;
38
39#[derive(Debug, Serialize)]
40pub struct Resources {
41 pub environmentd_network_policies: Vec<NetworkPolicy>,
42 pub service_account: Box<Option<ServiceAccount>>,
43 pub role: Box<Role>,
44 pub role_binding: Box<RoleBinding>,
45 pub environmentd_certificate: Box<Option<Certificate>>,
46}
47
48impl Resources {
49 pub fn new(config: &super::Config, mz: &Materialize) -> Self {
50 let environmentd_network_policies = create_environmentd_network_policies(config, mz);
51
52 let service_account = Box::new(create_service_account_object(config, mz));
53 let role = Box::new(create_role_object(mz));
54 let role_binding = Box::new(create_role_binding_object(mz));
55 let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
56
57 Self {
58 environmentd_network_policies,
59 service_account,
60 role,
61 role_binding,
62 environmentd_certificate,
63 }
64 }
65
66 #[instrument]
67 pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
68 let environmentd_network_policy_api: Api<NetworkPolicy> =
69 Api::namespaced(client.clone(), namespace);
70 let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
71 let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
72 let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
73 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
74
75 for policy in &self.environmentd_network_policies {
76 trace!("applying network policy {}", policy.name_unchecked());
77 apply_resource(&environmentd_network_policy_api, policy).await?;
78 }
79
80 if let Some(service_account) = &*self.service_account {
81 trace!("applying environmentd service account");
82 apply_resource(&service_account_api, service_account).await?;
83 }
84
85 trace!("applying environmentd role");
86 apply_resource(&role_api, &*self.role).await?;
87
88 trace!("applying environmentd role binding");
89 apply_resource(&role_binding_api, &*self.role_binding).await?;
90
91 if let Some(certificate) = &*self.environmentd_certificate {
92 trace!("creating new environmentd certificate");
93 apply_resource(&certificate_api, certificate).await?;
94 }
95
96 Ok(None)
97 }
98}
99
100fn create_environmentd_network_policies(
101 config: &super::Config,
102 mz: &Materialize,
103) -> Vec<NetworkPolicy> {
104 let mut network_policies = Vec::new();
105 if config.network_policies_internal_enabled {
106 let environmentd_label_selector = LabelSelector {
107 match_labels: Some(
108 mz.default_labels()
109 .into_iter()
110 .chain([(
111 "materialize.cloud/app".to_owned(),
112 mz.environmentd_app_name(),
113 )])
114 .collect(),
115 ),
116 ..Default::default()
117 };
118 let orchestratord_label_selector = LabelSelector {
119 match_labels: Some(
120 config
121 .orchestratord_pod_selector_labels
122 .iter()
123 .cloned()
124 .map(|kv| (kv.key, kv.value))
125 .collect(),
126 ),
127 ..Default::default()
128 };
129 let all_pods_label_selector = LabelSelector {
132 match_labels: Some(
138 [(
139 "materialize.cloud/mz-resource-id".to_owned(),
140 mz.resource_id().to_owned(),
141 )]
142 .into(),
143 ),
144 ..Default::default()
145 };
146 network_policies.extend([
147 NetworkPolicy {
150 metadata: mz
151 .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
152 spec: Some(NetworkPolicySpec {
153 egress: Some(vec![NetworkPolicyEgressRule {
154 to: Some(vec![NetworkPolicyPeer {
155 pod_selector: Some(all_pods_label_selector.clone()),
156 ..Default::default()
157 }]),
158 ..Default::default()
159 }]),
160 ingress: Some(vec![NetworkPolicyIngressRule {
161 from: Some(vec![NetworkPolicyPeer {
162 pod_selector: Some(all_pods_label_selector.clone()),
163 ..Default::default()
164 }]),
165 ..Default::default()
166 }]),
167 pod_selector: Some(all_pods_label_selector.clone()),
168 policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
169 ..Default::default()
170 }),
171 },
172 NetworkPolicy {
175 metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
176 spec: Some(NetworkPolicySpec {
177 ingress: Some(vec![NetworkPolicyIngressRule {
178 from: Some(vec![NetworkPolicyPeer {
179 namespace_selector: Some(LabelSelector {
180 match_labels: Some(btreemap! {
181 "kubernetes.io/metadata.name".into()
182 => config.orchestratord_namespace.clone(),
183 }),
184 ..Default::default()
185 }),
186 pod_selector: Some(orchestratord_label_selector),
187 ..Default::default()
188 }]),
189 ports: Some(vec![
190 NetworkPolicyPort {
191 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
192 protocol: Some("TCP".to_string()),
193 ..Default::default()
194 },
195 NetworkPolicyPort {
196 port: Some(IntOrString::Int(
197 config.environmentd_internal_http_port.into(),
198 )),
199 protocol: Some("TCP".to_string()),
200 ..Default::default()
201 },
202 ]),
203 ..Default::default()
204 }]),
205 pod_selector: Some(environmentd_label_selector),
206 policy_types: Some(vec!["Ingress".to_owned()]),
207 ..Default::default()
208 }),
209 },
210 ]);
211 }
212 if config.network_policies_ingress_enabled {
213 let mut ingress_label_selector = mz.default_labels();
214 ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
215 network_policies.extend([NetworkPolicy {
216 metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
217 spec: Some(NetworkPolicySpec {
218 ingress: Some(vec![NetworkPolicyIngressRule {
219 from: Some(
220 config
221 .network_policies_ingress_cidrs
222 .iter()
223 .map(|cidr| NetworkPolicyPeer {
224 ip_block: Some(IPBlock {
225 cidr: cidr.to_owned(),
226 except: None,
227 }),
228 ..Default::default()
229 })
230 .collect(),
231 ),
232 ports: Some(vec![
233 NetworkPolicyPort {
234 port: Some(IntOrString::Int(config.environmentd_http_port.into())),
235 protocol: Some("TCP".to_string()),
236 ..Default::default()
237 },
238 NetworkPolicyPort {
239 port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
240 protocol: Some("TCP".to_string()),
241 ..Default::default()
242 },
243 ]),
244 ..Default::default()
245 }]),
246 pod_selector: Some(LabelSelector {
247 match_expressions: None,
248 match_labels: Some(ingress_label_selector),
249 }),
250 policy_types: Some(vec!["Ingress".to_owned()]),
251 ..Default::default()
252 }),
253 }]);
254 }
255 if config.network_policies_egress_enabled {
256 network_policies.extend([NetworkPolicy {
257 metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
258 spec: Some(NetworkPolicySpec {
259 egress: Some(vec![NetworkPolicyEgressRule {
260 to: Some(
261 config
262 .network_policies_egress_cidrs
263 .iter()
264 .map(|cidr| NetworkPolicyPeer {
265 ip_block: Some(IPBlock {
266 cidr: cidr.to_owned(),
267 except: None,
268 }),
269 ..Default::default()
270 })
271 .collect(),
272 ),
273 ..Default::default()
274 }]),
275 pod_selector: Some(LabelSelector {
276 match_expressions: None,
277 match_labels: Some(mz.default_labels()),
278 }),
279 policy_types: Some(vec!["Egress".to_owned()]),
280 ..Default::default()
281 }),
282 }]);
283 }
284 network_policies
285}
286
287fn create_service_account_object(
288 config: &super::Config,
289 mz: &Materialize,
290) -> Option<ServiceAccount> {
291 if mz.create_service_account() {
292 let mut annotations: BTreeMap<String, String> = mz
293 .spec
294 .service_account_annotations
295 .clone()
296 .unwrap_or_default();
297 if let (CloudProvider::Aws, Some(role_arn)) = (
298 config.cloud_provider,
299 mz.spec
300 .environmentd_iam_role_arn
301 .as_deref()
302 .or(config.environmentd_iam_role_arn.as_deref()),
303 ) {
304 warn!(
305 "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
306 );
307 annotations.insert(
308 "eks.amazonaws.com/role-arn".to_string(),
309 role_arn.to_string(),
310 );
311 };
312
313 let mut labels = mz.default_labels();
314 labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
315
316 Some(ServiceAccount {
317 metadata: ObjectMeta {
318 annotations: Some(annotations),
319 labels: Some(labels),
320 ..mz.managed_resource_meta(mz.service_account_name())
321 },
322 ..Default::default()
323 })
324 } else {
325 None
326 }
327}
328
329fn create_role_object(mz: &Materialize) -> Role {
330 Role {
331 metadata: mz.managed_resource_meta(mz.role_name()),
332 rules: Some(vec![
333 PolicyRule {
334 api_groups: Some(vec!["apps".to_string()]),
335 resources: Some(vec!["statefulsets".to_string()]),
336 verbs: vec![
337 "get".to_string(),
338 "list".to_string(),
339 "watch".to_string(),
340 "create".to_string(),
341 "update".to_string(),
342 "patch".to_string(),
343 "delete".to_string(),
344 ],
345 ..Default::default()
346 },
347 PolicyRule {
348 api_groups: Some(vec!["".to_string()]),
349 resources: Some(vec![
350 "persistentvolumeclaims".to_string(),
351 "pods".to_string(),
352 "secrets".to_string(),
353 "services".to_string(),
354 ]),
355 verbs: vec![
356 "get".to_string(),
357 "list".to_string(),
358 "watch".to_string(),
359 "create".to_string(),
360 "update".to_string(),
361 "patch".to_string(),
362 "delete".to_string(),
363 ],
364 ..Default::default()
365 },
366 PolicyRule {
367 api_groups: Some(vec!["".to_string()]),
368 resources: Some(vec!["configmaps".to_string()]),
369 verbs: vec!["get".to_string()],
370 ..Default::default()
371 },
372 PolicyRule {
373 api_groups: Some(vec!["materialize.cloud".to_string()]),
374 resources: Some(vec!["vpcendpoints".to_string()]),
375 verbs: vec![
376 "get".to_string(),
377 "list".to_string(),
378 "watch".to_string(),
379 "create".to_string(),
380 "update".to_string(),
381 "patch".to_string(),
382 "delete".to_string(),
383 ],
384 ..Default::default()
385 },
386 PolicyRule {
387 api_groups: Some(vec!["metrics.k8s.io".to_string()]),
388 resources: Some(vec!["pods".to_string()]),
389 verbs: vec!["get".to_string(), "list".to_string()],
390 ..Default::default()
391 },
392 PolicyRule {
393 api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
394 resources: Some(vec![
395 "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
396 "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
397 ]),
398 verbs: vec!["get".to_string()],
399 ..Default::default()
400 },
401 ]),
402 }
403}
404
405fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
406 RoleBinding {
407 metadata: mz.managed_resource_meta(mz.role_binding_name()),
408 role_ref: RoleRef {
409 api_group: "".to_string(),
410 kind: "Role".to_string(),
411 name: mz.role_name(),
412 },
413 subjects: Some(vec![Subject {
414 api_group: Some("".to_string()),
415 kind: "ServiceAccount".to_string(),
416 name: mz.service_account_name(),
417 namespace: Some(mz.namespace()),
418 }]),
419 }
420}
421
422fn create_environmentd_certificate(
423 config: &super::Config,
424 mz: &Materialize,
425) -> Option<Certificate> {
426 create_certificate(
427 config.default_certificate_specs.internal.clone(),
428 mz,
429 mz.spec.internal_certificate_spec.clone(),
430 mz.environmentd_certificate_name(),
431 mz.environmentd_certificate_secret_name(),
432 Some(vec![
433 mz.environmentd_service_name(),
434 mz.environmentd_service_internal_fqdn(),
435 ]),
436 CertificatePrivateKeyAlgorithm::Ed25519,
437 None,
438 )
439}