Skip to main content

mz_orchestratord/controller/materialize/
global.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use k8s_openapi::{
13    api::{
14        core::v1::ServiceAccount,
15        networking::v1::{
16            IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule,
17            NetworkPolicyPeer, NetworkPolicyPort, NetworkPolicySpec,
18        },
19        rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
20    },
21    apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use serde::Serialize;
26use tracing::{trace, warn};
27
28use super::Error;
29use crate::k8s::apply_resource;
30use crate::tls::create_certificate;
31use mz_cloud_provider::CloudProvider;
32use mz_cloud_resources::crd::materialize::v1alpha1::Materialize;
33use mz_cloud_resources::crd::{
34    ManagedResource,
35    generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36};
37use mz_ore::instrument;
38
39#[derive(Debug, Serialize)]
40pub struct Resources {
41    pub environmentd_network_policies: Vec<NetworkPolicy>,
42    pub service_account: Box<Option<ServiceAccount>>,
43    pub role: Box<Role>,
44    pub role_binding: Box<RoleBinding>,
45    pub environmentd_certificate: Box<Option<Certificate>>,
46}
47
48impl Resources {
49    pub fn new(config: &super::Config, mz: &Materialize) -> Self {
50        let environmentd_network_policies = create_environmentd_network_policies(config, mz);
51
52        let service_account = Box::new(create_service_account_object(config, mz));
53        let role = Box::new(create_role_object(mz));
54        let role_binding = Box::new(create_role_binding_object(mz));
55        let environmentd_certificate = Box::new(create_environmentd_certificate(config, mz));
56
57        Self {
58            environmentd_network_policies,
59            service_account,
60            role,
61            role_binding,
62            environmentd_certificate,
63        }
64    }
65
66    #[instrument]
67    pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
68        let environmentd_network_policy_api: Api<NetworkPolicy> =
69            Api::namespaced(client.clone(), namespace);
70        let service_account_api: Api<ServiceAccount> = Api::namespaced(client.clone(), namespace);
71        let role_api: Api<Role> = Api::namespaced(client.clone(), namespace);
72        let role_binding_api: Api<RoleBinding> = Api::namespaced(client.clone(), namespace);
73        let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
74
75        for policy in &self.environmentd_network_policies {
76            trace!("applying network policy {}", policy.name_unchecked());
77            apply_resource(&environmentd_network_policy_api, policy).await?;
78        }
79
80        if let Some(service_account) = &*self.service_account {
81            trace!("applying environmentd service account");
82            apply_resource(&service_account_api, service_account).await?;
83        }
84
85        trace!("applying environmentd role");
86        apply_resource(&role_api, &*self.role).await?;
87
88        trace!("applying environmentd role binding");
89        apply_resource(&role_binding_api, &*self.role_binding).await?;
90
91        if let Some(certificate) = &*self.environmentd_certificate {
92            trace!("creating new environmentd certificate");
93            apply_resource(&certificate_api, certificate).await?;
94        }
95
96        Ok(None)
97    }
98}
99
100fn create_environmentd_network_policies(
101    config: &super::Config,
102    mz: &Materialize,
103) -> Vec<NetworkPolicy> {
104    let mut network_policies = Vec::new();
105    if config.network_policies_internal_enabled {
106        let environmentd_label_selector = LabelSelector {
107            match_labels: Some(
108                mz.default_labels()
109                    .into_iter()
110                    .chain([(
111                        "materialize.cloud/app".to_owned(),
112                        mz.environmentd_app_name(),
113                    )])
114                    .collect(),
115            ),
116            ..Default::default()
117        };
118        let orchestratord_label_selector = LabelSelector {
119            match_labels: Some(
120                config
121                    .orchestratord_pod_selector_labels
122                    .iter()
123                    .cloned()
124                    .map(|kv| (kv.key, kv.value))
125                    .collect(),
126            ),
127            ..Default::default()
128        };
129        // TODO (Alex) filter to just clusterd and environmentd,
130        // once we get a consistent set of labels for both.
131        let all_pods_label_selector = LabelSelector {
132            // TODO: can't use default_labels() here because it needs to be
133            // consistent between balancer and materialize resources, and
134            // materialize resources have additional labels - we should
135            // figure out something better here (probably balancers should
136            // install their own network policies)
137            match_labels: Some(
138                [(
139                    "materialize.cloud/mz-resource-id".to_owned(),
140                    mz.resource_id().to_owned(),
141                )]
142                .into(),
143            ),
144            ..Default::default()
145        };
146        network_policies.extend([
147            // Allow all clusterd/environmentd traffic (between pods in the
148            // same environment)
149            NetworkPolicy {
150                metadata: mz
151                    .managed_resource_meta(mz.name_prefixed("allow-all-within-environment")),
152                spec: Some(NetworkPolicySpec {
153                    egress: Some(vec![NetworkPolicyEgressRule {
154                        to: Some(vec![NetworkPolicyPeer {
155                            pod_selector: Some(all_pods_label_selector.clone()),
156                            ..Default::default()
157                        }]),
158                        ..Default::default()
159                    }]),
160                    ingress: Some(vec![NetworkPolicyIngressRule {
161                        from: Some(vec![NetworkPolicyPeer {
162                            pod_selector: Some(all_pods_label_selector.clone()),
163                            ..Default::default()
164                        }]),
165                        ..Default::default()
166                    }]),
167                    pod_selector: Some(all_pods_label_selector.clone()),
168                    policy_types: Some(vec!["Ingress".to_owned(), "Egress".to_owned()]),
169                    ..Default::default()
170                }),
171            },
172            // Allow traffic from orchestratord to environmentd in order to hit
173            // the promotion endpoints during upgrades
174            NetworkPolicy {
175                metadata: mz.managed_resource_meta(mz.name_prefixed("allow-orchestratord")),
176                spec: Some(NetworkPolicySpec {
177                    ingress: Some(vec![NetworkPolicyIngressRule {
178                        from: Some(vec![NetworkPolicyPeer {
179                            namespace_selector: Some(LabelSelector {
180                                match_labels: Some(btreemap! {
181                                    "kubernetes.io/metadata.name".into()
182                                        => config.orchestratord_namespace.clone(),
183                                }),
184                                ..Default::default()
185                            }),
186                            pod_selector: Some(orchestratord_label_selector),
187                            ..Default::default()
188                        }]),
189                        ports: Some(vec![
190                            NetworkPolicyPort {
191                                port: Some(IntOrString::Int(config.environmentd_http_port.into())),
192                                protocol: Some("TCP".to_string()),
193                                ..Default::default()
194                            },
195                            NetworkPolicyPort {
196                                port: Some(IntOrString::Int(
197                                    config.environmentd_internal_http_port.into(),
198                                )),
199                                protocol: Some("TCP".to_string()),
200                                ..Default::default()
201                            },
202                        ]),
203                        ..Default::default()
204                    }]),
205                    pod_selector: Some(environmentd_label_selector),
206                    policy_types: Some(vec!["Ingress".to_owned()]),
207                    ..Default::default()
208                }),
209            },
210        ]);
211    }
212    if config.network_policies_ingress_enabled {
213        let mut ingress_label_selector = mz.default_labels();
214        ingress_label_selector.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
215        network_policies.extend([NetworkPolicy {
216            metadata: mz.managed_resource_meta(mz.name_prefixed("sql-and-http-ingress")),
217            spec: Some(NetworkPolicySpec {
218                ingress: Some(vec![NetworkPolicyIngressRule {
219                    from: Some(
220                        config
221                            .network_policies_ingress_cidrs
222                            .iter()
223                            .map(|cidr| NetworkPolicyPeer {
224                                ip_block: Some(IPBlock {
225                                    cidr: cidr.to_owned(),
226                                    except: None,
227                                }),
228                                ..Default::default()
229                            })
230                            .collect(),
231                    ),
232                    ports: Some(vec![
233                        NetworkPolicyPort {
234                            port: Some(IntOrString::Int(config.environmentd_http_port.into())),
235                            protocol: Some("TCP".to_string()),
236                            ..Default::default()
237                        },
238                        NetworkPolicyPort {
239                            port: Some(IntOrString::Int(config.environmentd_sql_port.into())),
240                            protocol: Some("TCP".to_string()),
241                            ..Default::default()
242                        },
243                    ]),
244                    ..Default::default()
245                }]),
246                pod_selector: Some(LabelSelector {
247                    match_expressions: None,
248                    match_labels: Some(ingress_label_selector),
249                }),
250                policy_types: Some(vec!["Ingress".to_owned()]),
251                ..Default::default()
252            }),
253        }]);
254    }
255    if config.network_policies_egress_enabled {
256        network_policies.extend([NetworkPolicy {
257            metadata: mz.managed_resource_meta(mz.name_prefixed("sources-and-sinks-egress")),
258            spec: Some(NetworkPolicySpec {
259                egress: Some(vec![NetworkPolicyEgressRule {
260                    to: Some(
261                        config
262                            .network_policies_egress_cidrs
263                            .iter()
264                            .map(|cidr| NetworkPolicyPeer {
265                                ip_block: Some(IPBlock {
266                                    cidr: cidr.to_owned(),
267                                    except: None,
268                                }),
269                                ..Default::default()
270                            })
271                            .collect(),
272                    ),
273                    ..Default::default()
274                }]),
275                pod_selector: Some(LabelSelector {
276                    match_expressions: None,
277                    match_labels: Some(mz.default_labels()),
278                }),
279                policy_types: Some(vec!["Egress".to_owned()]),
280                ..Default::default()
281            }),
282        }]);
283    }
284    network_policies
285}
286
287fn create_service_account_object(
288    config: &super::Config,
289    mz: &Materialize,
290) -> Option<ServiceAccount> {
291    if mz.create_service_account() {
292        let mut annotations: BTreeMap<String, String> = mz
293            .spec
294            .service_account_annotations
295            .clone()
296            .unwrap_or_default();
297        if let (CloudProvider::Aws, Some(role_arn)) = (
298            config.cloud_provider,
299            mz.spec
300                .environmentd_iam_role_arn
301                .as_deref()
302                .or(config.environmentd_iam_role_arn.as_deref()),
303        ) {
304            warn!(
305                "Use of Materialize.spec.environmentd_iam_role_arn is deprecated. Please set \"eks.amazonaws.com/role-arn\" in Materialize.spec.service_account_annotations instead."
306            );
307            annotations.insert(
308                "eks.amazonaws.com/role-arn".to_string(),
309                role_arn.to_string(),
310            );
311        };
312
313        let mut labels = mz.default_labels();
314        labels.extend(mz.spec.service_account_labels.clone().unwrap_or_default());
315
316        Some(ServiceAccount {
317            metadata: ObjectMeta {
318                annotations: Some(annotations),
319                labels: Some(labels),
320                ..mz.managed_resource_meta(mz.service_account_name())
321            },
322            ..Default::default()
323        })
324    } else {
325        None
326    }
327}
328
329fn create_role_object(mz: &Materialize) -> Role {
330    Role {
331        metadata: mz.managed_resource_meta(mz.role_name()),
332        rules: Some(vec![
333            PolicyRule {
334                api_groups: Some(vec!["apps".to_string()]),
335                resources: Some(vec!["statefulsets".to_string()]),
336                verbs: vec![
337                    "get".to_string(),
338                    "list".to_string(),
339                    "watch".to_string(),
340                    "create".to_string(),
341                    "update".to_string(),
342                    "patch".to_string(),
343                    "delete".to_string(),
344                ],
345                ..Default::default()
346            },
347            PolicyRule {
348                api_groups: Some(vec!["".to_string()]),
349                resources: Some(vec![
350                    "persistentvolumeclaims".to_string(),
351                    "pods".to_string(),
352                    "secrets".to_string(),
353                    "services".to_string(),
354                ]),
355                verbs: vec![
356                    "get".to_string(),
357                    "list".to_string(),
358                    "watch".to_string(),
359                    "create".to_string(),
360                    "update".to_string(),
361                    "patch".to_string(),
362                    "delete".to_string(),
363                ],
364                ..Default::default()
365            },
366            PolicyRule {
367                api_groups: Some(vec!["".to_string()]),
368                resources: Some(vec!["configmaps".to_string()]),
369                verbs: vec!["get".to_string()],
370                ..Default::default()
371            },
372            PolicyRule {
373                api_groups: Some(vec!["materialize.cloud".to_string()]),
374                resources: Some(vec!["vpcendpoints".to_string()]),
375                verbs: vec![
376                    "get".to_string(),
377                    "list".to_string(),
378                    "watch".to_string(),
379                    "create".to_string(),
380                    "update".to_string(),
381                    "patch".to_string(),
382                    "delete".to_string(),
383                ],
384                ..Default::default()
385            },
386            PolicyRule {
387                api_groups: Some(vec!["metrics.k8s.io".to_string()]),
388                resources: Some(vec!["pods".to_string()]),
389                verbs: vec!["get".to_string(), "list".to_string()],
390                ..Default::default()
391            },
392            PolicyRule {
393                api_groups: Some(vec!["custom.metrics.k8s.io".to_string()]),
394                resources: Some(vec![
395                    "persistentvolumeclaims/kubelet_volume_stats_used_bytes".to_string(),
396                    "persistentvolumeclaims/kubelet_volume_stats_capacity_bytes".to_string(),
397                ]),
398                verbs: vec!["get".to_string()],
399                ..Default::default()
400            },
401        ]),
402    }
403}
404
405fn create_role_binding_object(mz: &Materialize) -> RoleBinding {
406    RoleBinding {
407        metadata: mz.managed_resource_meta(mz.role_binding_name()),
408        role_ref: RoleRef {
409            api_group: "".to_string(),
410            kind: "Role".to_string(),
411            name: mz.role_name(),
412        },
413        subjects: Some(vec![Subject {
414            api_group: Some("".to_string()),
415            kind: "ServiceAccount".to_string(),
416            name: mz.service_account_name(),
417            namespace: Some(mz.namespace()),
418        }]),
419    }
420}
421
422fn create_environmentd_certificate(
423    config: &super::Config,
424    mz: &Materialize,
425) -> Option<Certificate> {
426    create_certificate(
427        config.default_certificate_specs.internal.clone(),
428        mz,
429        mz.spec.internal_certificate_spec.clone(),
430        mz.environmentd_certificate_name(),
431        mz.environmentd_certificate_secret_name(),
432        Some(vec![
433            mz.environmentd_service_name(),
434            mz.environmentd_service_internal_fqdn(),
435        ]),
436        CertificatePrivateKeyAlgorithm::Ed25519,
437        None,
438    )
439}