1use std::time::Duration;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15 core::v1::{
16 Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17 PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18 ServicePort, ServiceSpec, Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28 controller::materialize::{
29 Error, matching_image_from_environmentd_image_ref,
30 tls::{create_certificate, issuer_ref_defined},
31 },
32 k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36 materialize::v1alpha1::Materialize,
37};
38use mz_ore::instrument;
39
40pub struct Resources {
41 balancerd_external_certificate: Box<Option<Certificate>>,
42 balancerd_deployment: Box<Deployment>,
43 balancerd_service: Box<Service>,
44}
45
46impl Resources {
47 pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
48 let balancerd_external_certificate =
49 Box::new(create_balancerd_external_certificate(config, mz));
50 let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
51 let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
52
53 Self {
54 balancerd_external_certificate,
55 balancerd_deployment,
56 balancerd_service,
57 }
58 }
59
60 #[instrument]
61 pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
62 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
63 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
64 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
65
66 if let Some(certificate) = &*self.balancerd_external_certificate {
67 trace!("creating new balancerd external certificate");
68 apply_resource(&certificate_api, certificate).await?;
69 }
70
71 trace!("creating new balancerd deployment");
72 apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
73
74 trace!("creating new balancerd service");
75 apply_resource(&service_api, &*self.balancerd_service).await?;
76
77 if let Some(deployment) =
78 get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
79 {
80 for condition in deployment
81 .status
82 .as_ref()
83 .and_then(|status| status.conditions.as_deref())
84 .unwrap_or(&[])
85 {
86 if condition.type_ == "Available" && condition.status == "True" {
87 return Ok(None);
88 }
89 }
90 }
91
92 Ok(Some(Action::requeue(Duration::from_secs(1))))
93 }
94
95 #[instrument]
96 pub async fn cleanup(
97 &self,
98 client: &Client,
99 namespace: &str,
100 ) -> Result<Option<Action>, anyhow::Error> {
101 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
102 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
103 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
104
105 trace!("deleting balancerd service");
106 delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
107
108 trace!("deleting balancerd deployment");
109 delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
110
111 if let Some(certificate) = &*self.balancerd_external_certificate {
112 trace!("deleting balancerd external certificate");
113 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
114 }
115
116 Ok(None)
117 }
118}
119
120fn create_balancerd_external_certificate(
121 config: &super::MaterializeControllerArgs,
122 mz: &Materialize,
123) -> Option<Certificate> {
124 create_certificate(
125 config.default_certificate_specs.balancerd_external.clone(),
126 mz,
127 mz.spec.balancerd_external_certificate_spec.clone(),
128 mz.balancerd_external_certificate_name(),
129 mz.balancerd_external_certificate_secret_name(),
130 None,
131 CertificatePrivateKeyAlgorithm::Rsa,
132 Some(4096),
133 )
134}
135
136fn create_balancerd_deployment_object(
137 config: &super::MaterializeControllerArgs,
138 mz: &Materialize,
139) -> Deployment {
140 let security_context = if config.enable_security_context {
141 Some(SecurityContext {
145 run_as_non_root: Some(true),
146 capabilities: Some(Capabilities {
147 drop: Some(vec!["ALL".to_string()]),
148 ..Default::default()
149 }),
150 seccomp_profile: Some(SeccompProfile {
151 type_: "RuntimeDefault".to_string(),
152 ..Default::default()
153 }),
154 allow_privilege_escalation: Some(false),
155 ..Default::default()
156 })
157 } else {
158 None
159 };
160
161 let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
162 Some(btreemap! {
163 "prometheus.io/scrape".to_owned() => "true".to_string(),
164 "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
165 "prometheus.io/path".to_owned() => "/metrics".to_string(),
166 "prometheus.io/scheme".to_owned() => "http".to_string(),
167 })
168 } else {
169 None
170 };
171 let mut pod_template_labels = mz.default_labels();
172 pod_template_labels.insert(
173 "materialize.cloud/name".to_owned(),
174 mz.balancerd_deployment_name(),
175 );
176 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
177 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
178
179 let ports = vec![
180 ContainerPort {
181 container_port: config.balancerd_sql_port.into(),
182 name: Some("pgwire".into()),
183 protocol: Some("TCP".into()),
184 ..Default::default()
185 },
186 ContainerPort {
187 container_port: config.balancerd_http_port.into(),
188 name: Some("http".into()),
189 protocol: Some("TCP".into()),
190 ..Default::default()
191 },
192 ContainerPort {
193 container_port: config.balancerd_internal_http_port.into(),
194 name: Some("internal-http".into()),
195 protocol: Some("TCP".into()),
196 ..Default::default()
197 },
198 ];
199
200 let mut args = vec![
201 "service".to_string(),
202 format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
203 format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
204 format!(
205 "--internal-http-listen-addr=0.0.0.0:{}",
206 config.balancerd_internal_http_port
207 ),
208 format!(
209 "--https-resolver-template={}.{}.svc.cluster.local:{}",
210 mz.environmentd_service_name(),
211 mz.namespace(),
212 config.environmentd_http_port
213 ),
214 format!(
215 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
216 mz.environmentd_service_name(),
217 mz.namespace(),
218 config.environmentd_sql_port
219 ),
220 ];
221
222 if issuer_ref_defined(
223 &config.default_certificate_specs.internal,
224 &mz.spec.internal_certificate_spec,
225 ) {
226 args.push("--internal-tls".to_owned())
227 }
228
229 let mut volumes = Vec::new();
230 let mut volume_mounts = Vec::new();
231 if issuer_ref_defined(
232 &config.default_certificate_specs.balancerd_external,
233 &mz.spec.balancerd_external_certificate_spec,
234 ) {
235 volumes.push(Volume {
236 name: "external-certificate".to_owned(),
237 secret: Some(SecretVolumeSource {
238 default_mode: Some(0o400),
239 secret_name: Some(mz.balancerd_external_certificate_secret_name()),
240 items: None,
241 optional: Some(false),
242 }),
243 ..Default::default()
244 });
245 volume_mounts.push(VolumeMount {
246 name: "external-certificate".to_owned(),
247 mount_path: "/etc/external_tls".to_owned(),
248 read_only: Some(true),
249 ..Default::default()
250 });
251 args.extend([
252 "--tls-mode=require".into(),
253 "--tls-cert=/etc/external_tls/tls.crt".into(),
254 "--tls-key=/etc/external_tls/tls.key".into(),
255 ]);
256 } else {
257 args.push("--tls-mode=disable".to_string());
258 }
259
260 let startup_probe = Probe {
261 http_get: Some(HTTPGetAction {
262 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
263 path: Some("/api/readyz".into()),
264 ..Default::default()
265 }),
266 failure_threshold: Some(20),
267 initial_delay_seconds: Some(3),
268 period_seconds: Some(3),
269 success_threshold: Some(1),
270 timeout_seconds: Some(1),
271 ..Default::default()
272 };
273 let readiness_probe = Probe {
274 http_get: Some(HTTPGetAction {
275 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
276 path: Some("/api/readyz".into()),
277 ..Default::default()
278 }),
279 failure_threshold: Some(3),
280 period_seconds: Some(10),
281 success_threshold: Some(1),
282 timeout_seconds: Some(1),
283 ..Default::default()
284 };
285 let liveness_probe = Probe {
286 http_get: Some(HTTPGetAction {
287 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
288 path: Some("/api/livez".into()),
289 ..Default::default()
290 }),
291 failure_threshold: Some(3),
292 initial_delay_seconds: Some(8),
293 period_seconds: Some(10),
294 success_threshold: Some(1),
295 timeout_seconds: Some(1),
296 ..Default::default()
297 };
298
299 let container = Container {
300 name: "balancerd".to_owned(),
301 image: Some(matching_image_from_environmentd_image_ref(
302 &mz.spec.environmentd_image_ref,
303 "balancerd",
304 None,
305 )),
306 image_pull_policy: Some(config.image_pull_policy.to_string()),
307 ports: Some(ports),
308 args: Some(args),
309 startup_probe: Some(startup_probe),
310 readiness_probe: Some(readiness_probe),
311 liveness_probe: Some(liveness_probe),
312 resources: mz
313 .spec
314 .balancerd_resource_requirements
315 .clone()
316 .or_else(|| config.balancerd_default_resources.clone()),
317 security_context: security_context.clone(),
318 volume_mounts: Some(volume_mounts),
319 ..Default::default()
320 };
321
322 let deployment_spec = DeploymentSpec {
323 replicas: Some(mz.balancerd_replicas()),
324 selector: LabelSelector {
325 match_labels: Some(pod_template_labels.clone()),
326 ..Default::default()
327 },
328 strategy: Some(DeploymentStrategy {
329 rolling_update: Some(RollingUpdateDeployment {
330 max_surge: Some(IntOrString::String("100%".into())),
334 ..Default::default()
335 }),
336 ..Default::default()
337 }),
338 template: PodTemplateSpec {
339 metadata: Some(ObjectMeta {
342 annotations: pod_template_annotations,
343 labels: Some(pod_template_labels),
344 ..Default::default()
345 }),
346 spec: Some(PodSpec {
347 containers: vec![container],
348 node_selector: Some(
349 config
350 .balancerd_node_selector
351 .iter()
352 .map(|selector| (selector.key.clone(), selector.value.clone()))
353 .collect(),
354 ),
355 affinity: config.balancerd_affinity.clone(),
356 tolerations: config.balancerd_tolerations.clone(),
357 security_context: Some(PodSecurityContext {
358 fs_group: Some(999),
359 run_as_user: Some(999),
360 run_as_group: Some(999),
361 ..Default::default()
362 }),
363 scheduler_name: config.scheduler_name.clone(),
364 service_account_name: Some(mz.service_account_name()),
365 volumes: Some(volumes),
366 ..Default::default()
367 }),
368 },
369 ..Default::default()
370 };
371
372 Deployment {
373 metadata: ObjectMeta {
374 ..mz.managed_resource_meta(mz.balancerd_deployment_name())
375 },
376 spec: Some(deployment_spec),
377 status: None,
378 }
379}
380
381fn create_balancerd_service_object(
382 config: &super::MaterializeControllerArgs,
383 mz: &Materialize,
384) -> Service {
385 let selector =
386 btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
387
388 let ports = vec![
389 ServicePort {
390 name: Some("http".to_string()),
391 protocol: Some("TCP".to_string()),
392 port: config.balancerd_http_port.into(),
393 target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
394 ..Default::default()
395 },
396 ServicePort {
397 name: Some("pgwire".to_string()),
398 protocol: Some("TCP".to_string()),
399 port: config.balancerd_sql_port.into(),
400 target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
401 ..Default::default()
402 },
403 ];
404
405 let spec = ServiceSpec {
406 type_: Some("ClusterIP".to_string()),
407 cluster_ip: Some("None".to_string()),
408 selector: Some(selector),
409 ports: Some(ports),
410 ..Default::default()
411 };
412
413 Service {
414 metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
415 spec: Some(spec),
416 status: None,
417 }
418}