1use std::time::Duration;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15 core::v1::{
16 Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17 PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18 ServicePort, ServiceSpec, Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28 controller::materialize::{
29 matching_image_from_environmentd_image_ref,
30 tls::{create_certificate, issuer_ref_defined},
31 },
32 k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
36 materialize::v1alpha1::Materialize,
37};
38use mz_ore::instrument;
39
40pub struct Resources {
41 balancerd_external_certificate: Box<Option<Certificate>>,
42 balancerd_deployment: Box<Deployment>,
43 balancerd_service: Box<Service>,
44}
45
46impl Resources {
47 pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
48 let balancerd_external_certificate =
49 Box::new(create_balancerd_external_certificate(config, mz));
50 let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
51 let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
52
53 Self {
54 balancerd_external_certificate,
55 balancerd_deployment,
56 balancerd_service,
57 }
58 }
59
60 #[instrument]
61 pub async fn apply(
62 &self,
63 client: &Client,
64 namespace: &str,
65 ) -> Result<Option<Action>, anyhow::Error> {
66 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
67 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
68 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
69
70 if let Some(certificate) = &*self.balancerd_external_certificate {
71 trace!("creating new balancerd external certificate");
72 apply_resource(&certificate_api, certificate).await?;
73 }
74
75 trace!("creating new balancerd deployment");
76 apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
77
78 trace!("creating new balancerd service");
79 apply_resource(&service_api, &*self.balancerd_service).await?;
80
81 if let Some(deployment) =
82 get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
83 {
84 for condition in deployment
85 .status
86 .as_ref()
87 .and_then(|status| status.conditions.as_deref())
88 .unwrap_or(&[])
89 {
90 if condition.type_ == "Available" && condition.status == "True" {
91 return Ok(None);
92 }
93 }
94 }
95
96 Ok(Some(Action::requeue(Duration::from_secs(1))))
97 }
98
99 #[instrument]
100 pub async fn cleanup(
101 &self,
102 client: &Client,
103 namespace: &str,
104 ) -> Result<Option<Action>, anyhow::Error> {
105 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
106 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
107 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
108
109 trace!("deleting balancerd service");
110 delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
111
112 trace!("deleting balancerd deployment");
113 delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
114
115 if let Some(certificate) = &*self.balancerd_external_certificate {
116 trace!("deleting balancerd external certificate");
117 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
118 }
119
120 Ok(None)
121 }
122}
123
124fn create_balancerd_external_certificate(
125 config: &super::MaterializeControllerArgs,
126 mz: &Materialize,
127) -> Option<Certificate> {
128 create_certificate(
129 config.default_certificate_specs.balancerd_external.clone(),
130 mz,
131 mz.spec.balancerd_external_certificate_spec.clone(),
132 mz.balancerd_external_certificate_name(),
133 mz.balancerd_external_certificate_secret_name(),
134 None,
135 CertificatePrivateKeyAlgorithm::Rsa,
136 Some(4096),
137 )
138}
139
140fn create_balancerd_deployment_object(
141 config: &super::MaterializeControllerArgs,
142 mz: &Materialize,
143) -> Deployment {
144 let security_context = if config.enable_security_context {
145 Some(SecurityContext {
149 run_as_non_root: Some(true),
150 capabilities: Some(Capabilities {
151 drop: Some(vec!["ALL".to_string()]),
152 ..Default::default()
153 }),
154 seccomp_profile: Some(SeccompProfile {
155 type_: "RuntimeDefault".to_string(),
156 ..Default::default()
157 }),
158 allow_privilege_escalation: Some(false),
159 ..Default::default()
160 })
161 } else {
162 None
163 };
164
165 let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
166 Some(btreemap! {
167 "prometheus.io/scrape".to_owned() => "true".to_string(),
168 "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
169 "prometheus.io/path".to_owned() => "/metrics".to_string(),
170 "prometheus.io/scheme".to_owned() => "http".to_string(),
171 })
172 } else {
173 None
174 };
175 let mut pod_template_labels = mz.default_labels();
176 pod_template_labels.insert(
177 "materialize.cloud/name".to_owned(),
178 mz.balancerd_deployment_name(),
179 );
180 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
181 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
182
183 let ports = vec![
184 ContainerPort {
185 container_port: config.balancerd_sql_port.into(),
186 name: Some("pgwire".into()),
187 protocol: Some("TCP".into()),
188 ..Default::default()
189 },
190 ContainerPort {
191 container_port: config.balancerd_http_port.into(),
192 name: Some("http".into()),
193 protocol: Some("TCP".into()),
194 ..Default::default()
195 },
196 ContainerPort {
197 container_port: config.balancerd_internal_http_port.into(),
198 name: Some("internal-http".into()),
199 protocol: Some("TCP".into()),
200 ..Default::default()
201 },
202 ];
203
204 let mut args = vec![
205 "service".to_string(),
206 format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
207 format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
208 format!(
209 "--internal-http-listen-addr=0.0.0.0:{}",
210 config.balancerd_internal_http_port
211 ),
212 format!(
213 "--https-resolver-template={}.{}.svc.cluster.local:{}",
214 mz.environmentd_service_name(),
215 mz.namespace(),
216 config.environmentd_http_port
217 ),
218 format!(
219 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
220 mz.environmentd_service_name(),
221 mz.namespace(),
222 config.environmentd_sql_port
223 ),
224 ];
225
226 if issuer_ref_defined(
227 &config.default_certificate_specs.internal,
228 &mz.spec.internal_certificate_spec,
229 ) {
230 args.push("--internal-tls".to_owned())
231 }
232
233 let mut volumes = Vec::new();
234 let mut volume_mounts = Vec::new();
235 if issuer_ref_defined(
236 &config.default_certificate_specs.balancerd_external,
237 &mz.spec.balancerd_external_certificate_spec,
238 ) {
239 volumes.push(Volume {
240 name: "external-certificate".to_owned(),
241 secret: Some(SecretVolumeSource {
242 default_mode: Some(0o400),
243 secret_name: Some(mz.balancerd_external_certificate_secret_name()),
244 items: None,
245 optional: Some(false),
246 }),
247 ..Default::default()
248 });
249 volume_mounts.push(VolumeMount {
250 name: "external-certificate".to_owned(),
251 mount_path: "/etc/external_tls".to_owned(),
252 read_only: Some(true),
253 ..Default::default()
254 });
255 args.extend([
256 "--tls-mode=require".into(),
257 "--tls-cert=/etc/external_tls/tls.crt".into(),
258 "--tls-key=/etc/external_tls/tls.key".into(),
259 ]);
260 } else {
261 args.push("--tls-mode=disable".to_string());
262 }
263
264 let startup_probe = Probe {
265 http_get: Some(HTTPGetAction {
266 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
267 path: Some("/api/readyz".into()),
268 ..Default::default()
269 }),
270 failure_threshold: Some(20),
271 initial_delay_seconds: Some(3),
272 period_seconds: Some(3),
273 success_threshold: Some(1),
274 timeout_seconds: Some(1),
275 ..Default::default()
276 };
277 let readiness_probe = Probe {
278 http_get: Some(HTTPGetAction {
279 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
280 path: Some("/api/readyz".into()),
281 ..Default::default()
282 }),
283 failure_threshold: Some(3),
284 period_seconds: Some(10),
285 success_threshold: Some(1),
286 timeout_seconds: Some(1),
287 ..Default::default()
288 };
289 let liveness_probe = Probe {
290 http_get: Some(HTTPGetAction {
291 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
292 path: Some("/api/livez".into()),
293 ..Default::default()
294 }),
295 failure_threshold: Some(3),
296 initial_delay_seconds: Some(8),
297 period_seconds: Some(10),
298 success_threshold: Some(1),
299 timeout_seconds: Some(1),
300 ..Default::default()
301 };
302
303 let container = Container {
304 name: "balancerd".to_owned(),
305 image: Some(matching_image_from_environmentd_image_ref(
306 &mz.spec.environmentd_image_ref,
307 "balancerd",
308 None,
309 )),
310 image_pull_policy: Some(config.image_pull_policy.to_string()),
311 ports: Some(ports),
312 args: Some(args),
313 startup_probe: Some(startup_probe),
314 readiness_probe: Some(readiness_probe),
315 liveness_probe: Some(liveness_probe),
316 resources: mz
317 .spec
318 .balancerd_resource_requirements
319 .clone()
320 .or_else(|| config.balancerd_default_resources.clone()),
321 security_context: security_context.clone(),
322 volume_mounts: Some(volume_mounts),
323 ..Default::default()
324 };
325
326 let deployment_spec = DeploymentSpec {
327 replicas: Some(mz.balancerd_replicas()),
328 selector: LabelSelector {
329 match_labels: Some(pod_template_labels.clone()),
330 ..Default::default()
331 },
332 strategy: Some(DeploymentStrategy {
333 rolling_update: Some(RollingUpdateDeployment {
334 max_surge: Some(IntOrString::String("100%".into())),
338 ..Default::default()
339 }),
340 ..Default::default()
341 }),
342 template: PodTemplateSpec {
343 metadata: Some(ObjectMeta {
346 annotations: pod_template_annotations,
347 labels: Some(pod_template_labels),
348 ..Default::default()
349 }),
350 spec: Some(PodSpec {
351 containers: vec![container],
352 node_selector: Some(
353 config
354 .balancerd_node_selector
355 .iter()
356 .map(|selector| (selector.key.clone(), selector.value.clone()))
357 .collect(),
358 ),
359 affinity: config.balancerd_affinity.clone(),
360 tolerations: config.balancerd_tolerations.clone(),
361 security_context: Some(PodSecurityContext {
362 fs_group: Some(999),
363 run_as_user: Some(999),
364 run_as_group: Some(999),
365 ..Default::default()
366 }),
367 scheduler_name: config.scheduler_name.clone(),
368 service_account_name: Some(mz.service_account_name()),
369 volumes: Some(volumes),
370 ..Default::default()
371 }),
372 },
373 ..Default::default()
374 };
375
376 Deployment {
377 metadata: ObjectMeta {
378 ..mz.managed_resource_meta(mz.balancerd_deployment_name())
379 },
380 spec: Some(deployment_spec),
381 status: None,
382 }
383}
384
385fn create_balancerd_service_object(
386 config: &super::MaterializeControllerArgs,
387 mz: &Materialize,
388) -> Service {
389 let selector =
390 btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
391
392 let ports = vec![
393 ServicePort {
394 name: Some("http".to_string()),
395 protocol: Some("TCP".to_string()),
396 port: config.balancerd_http_port.into(),
397 target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
398 ..Default::default()
399 },
400 ServicePort {
401 name: Some("pgwire".to_string()),
402 protocol: Some("TCP".to_string()),
403 port: config.balancerd_sql_port.into(),
404 target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
405 ..Default::default()
406 },
407 ];
408
409 let spec = ServiceSpec {
410 type_: Some("ClusterIP".to_string()),
411 cluster_ip: Some("None".to_string()),
412 selector: Some(selector),
413 ports: Some(ports),
414 ..Default::default()
415 };
416
417 Service {
418 metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
419 spec: Some(spec),
420 status: None,
421 }
422}