1use std::time::Duration;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15 core::v1::{
16 Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17 PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18 ServicePort, ServiceSpec, Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28 controller::materialize::{
29 matching_image_from_environmentd_image_ref,
30 tls::{create_certificate, issuer_ref_defined},
31 },
32 k8s::{apply_resource, delete_resource, get_resource},
33};
34use mz_cloud_resources::crd::{
35 generated::cert_manager::certificates::Certificate, materialize::v1alpha1::Materialize,
36};
37use mz_ore::instrument;
38
39pub struct Resources {
40 balancerd_external_certificate: Box<Option<Certificate>>,
41 balancerd_deployment: Box<Deployment>,
42 balancerd_service: Box<Service>,
43}
44
45impl Resources {
46 pub fn new(config: &super::MaterializeControllerArgs, mz: &Materialize) -> Self {
47 let balancerd_external_certificate =
48 Box::new(create_balancerd_external_certificate(config, mz));
49 let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
50 let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
51
52 Self {
53 balancerd_external_certificate,
54 balancerd_deployment,
55 balancerd_service,
56 }
57 }
58
59 #[instrument]
60 pub async fn apply(
61 &self,
62 client: &Client,
63 namespace: &str,
64 ) -> Result<Option<Action>, anyhow::Error> {
65 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
66 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
67 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
68
69 if let Some(certificate) = &*self.balancerd_external_certificate {
70 trace!("creating new balancerd external certificate");
71 apply_resource(&certificate_api, certificate).await?;
72 }
73
74 trace!("creating new balancerd deployment");
75 apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
76
77 trace!("creating new balancerd service");
78 apply_resource(&service_api, &*self.balancerd_service).await?;
79
80 if let Some(deployment) =
81 get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
82 {
83 for condition in deployment
84 .status
85 .as_ref()
86 .and_then(|status| status.conditions.as_deref())
87 .unwrap_or(&[])
88 {
89 if condition.type_ == "Available" && condition.status == "True" {
90 return Ok(None);
91 }
92 }
93 }
94
95 Ok(Some(Action::requeue(Duration::from_secs(1))))
96 }
97
98 #[instrument]
99 pub async fn cleanup(
100 &self,
101 client: &Client,
102 namespace: &str,
103 ) -> Result<Option<Action>, anyhow::Error> {
104 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
105 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
106 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
107
108 trace!("deleting balancerd service");
109 delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
110
111 trace!("deleting balancerd deployment");
112 delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
113
114 if let Some(certificate) = &*self.balancerd_external_certificate {
115 trace!("deleting balancerd external certificate");
116 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
117 }
118
119 Ok(None)
120 }
121}
122
123fn create_balancerd_external_certificate(
124 config: &super::MaterializeControllerArgs,
125 mz: &Materialize,
126) -> Option<Certificate> {
127 create_certificate(
128 config.default_certificate_specs.balancerd_external.clone(),
129 mz,
130 mz.spec.balancerd_external_certificate_spec.clone(),
131 mz.balancerd_external_certificate_name(),
132 mz.balancerd_external_certificate_secret_name(),
133 None,
134 )
135}
136
137fn create_balancerd_deployment_object(
138 config: &super::MaterializeControllerArgs,
139 mz: &Materialize,
140) -> Deployment {
141 let security_context = if config.enable_security_context {
142 Some(SecurityContext {
146 run_as_non_root: Some(true),
147 capabilities: Some(Capabilities {
148 drop: Some(vec!["ALL".to_string()]),
149 ..Default::default()
150 }),
151 seccomp_profile: Some(SeccompProfile {
152 type_: "RuntimeDefault".to_string(),
153 ..Default::default()
154 }),
155 allow_privilege_escalation: Some(false),
156 ..Default::default()
157 })
158 } else {
159 None
160 };
161
162 let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
163 Some(btreemap! {
164 "prometheus.io/scrape".to_owned() => "true".to_string(),
165 "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
166 "prometheus.io/path".to_owned() => "/metrics".to_string(),
167 "prometheus.io/scheme".to_owned() => "http".to_string(),
168 })
169 } else {
170 None
171 };
172 let mut pod_template_labels = mz.default_labels();
173 pod_template_labels.insert(
174 "materialize.cloud/name".to_owned(),
175 mz.balancerd_deployment_name(),
176 );
177 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
178 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
179
180 let ports = vec![
181 ContainerPort {
182 container_port: config.balancerd_sql_port.into(),
183 name: Some("pgwire".into()),
184 protocol: Some("TCP".into()),
185 ..Default::default()
186 },
187 ContainerPort {
188 container_port: config.balancerd_http_port.into(),
189 name: Some("http".into()),
190 protocol: Some("TCP".into()),
191 ..Default::default()
192 },
193 ContainerPort {
194 container_port: config.balancerd_internal_http_port.into(),
195 name: Some("internal-http".into()),
196 protocol: Some("TCP".into()),
197 ..Default::default()
198 },
199 ];
200
201 let mut args = vec![
202 "service".to_string(),
203 format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
204 format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
205 format!(
206 "--internal-http-listen-addr=0.0.0.0:{}",
207 config.balancerd_internal_http_port
208 ),
209 format!(
210 "--https-resolver-template={}.{}.svc.cluster.local:{}",
211 mz.environmentd_service_name(),
212 mz.namespace(),
213 config.environmentd_http_port
214 ),
215 format!(
216 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
217 mz.environmentd_service_name(),
218 mz.namespace(),
219 config.environmentd_sql_port
220 ),
221 ];
222
223 if issuer_ref_defined(
224 &config.default_certificate_specs.internal,
225 &mz.spec.internal_certificate_spec,
226 ) {
227 args.push("--internal-tls".to_owned())
228 }
229
230 let mut volumes = Vec::new();
231 let mut volume_mounts = Vec::new();
232 if issuer_ref_defined(
233 &config.default_certificate_specs.balancerd_external,
234 &mz.spec.balancerd_external_certificate_spec,
235 ) {
236 volumes.push(Volume {
237 name: "external-certificate".to_owned(),
238 secret: Some(SecretVolumeSource {
239 default_mode: Some(0o400),
240 secret_name: Some(mz.balancerd_external_certificate_secret_name()),
241 items: None,
242 optional: Some(false),
243 }),
244 ..Default::default()
245 });
246 volume_mounts.push(VolumeMount {
247 name: "external-certificate".to_owned(),
248 mount_path: "/etc/external_tls".to_owned(),
249 read_only: Some(true),
250 ..Default::default()
251 });
252 args.extend([
253 "--tls-mode=require".into(),
254 "--tls-cert=/etc/external_tls/tls.crt".into(),
255 "--tls-key=/etc/external_tls/tls.key".into(),
256 ]);
257 } else {
258 args.push("--tls-mode=disable".to_string());
259 }
260
261 let startup_probe = Probe {
262 http_get: Some(HTTPGetAction {
263 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
264 path: Some("/api/readyz".into()),
265 ..Default::default()
266 }),
267 failure_threshold: Some(20),
268 initial_delay_seconds: Some(3),
269 period_seconds: Some(3),
270 success_threshold: Some(1),
271 timeout_seconds: Some(1),
272 ..Default::default()
273 };
274 let readiness_probe = Probe {
275 http_get: Some(HTTPGetAction {
276 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
277 path: Some("/api/readyz".into()),
278 ..Default::default()
279 }),
280 failure_threshold: Some(3),
281 period_seconds: Some(10),
282 success_threshold: Some(1),
283 timeout_seconds: Some(1),
284 ..Default::default()
285 };
286 let liveness_probe = Probe {
287 http_get: Some(HTTPGetAction {
288 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
289 path: Some("/api/livez".into()),
290 ..Default::default()
291 }),
292 failure_threshold: Some(3),
293 initial_delay_seconds: Some(8),
294 period_seconds: Some(10),
295 success_threshold: Some(1),
296 timeout_seconds: Some(1),
297 ..Default::default()
298 };
299
300 let container = Container {
301 name: "balancerd".to_owned(),
302 image: Some(matching_image_from_environmentd_image_ref(
303 &mz.spec.environmentd_image_ref,
304 "balancerd",
305 None,
306 )),
307 image_pull_policy: Some(config.image_pull_policy.to_string()),
308 ports: Some(ports),
309 args: Some(args),
310 startup_probe: Some(startup_probe),
311 readiness_probe: Some(readiness_probe),
312 liveness_probe: Some(liveness_probe),
313 resources: mz.spec.balancerd_resource_requirements.clone(),
314 security_context: security_context.clone(),
315 volume_mounts: Some(volume_mounts),
316 ..Default::default()
317 };
318
319 let deployment_spec = DeploymentSpec {
320 replicas: Some(1),
321 selector: LabelSelector {
322 match_labels: Some(pod_template_labels.clone()),
323 ..Default::default()
324 },
325 strategy: Some(DeploymentStrategy {
326 rolling_update: Some(RollingUpdateDeployment {
327 max_surge: Some(IntOrString::String("100%".into())),
331 ..Default::default()
332 }),
333 ..Default::default()
334 }),
335 template: PodTemplateSpec {
336 metadata: Some(ObjectMeta {
339 annotations: pod_template_annotations,
340 labels: Some(pod_template_labels),
341 ..Default::default()
342 }),
343 spec: Some(PodSpec {
344 containers: vec![container],
345 node_selector: Some(
346 config
347 .balancerd_node_selector
348 .iter()
349 .map(|selector| (selector.key.clone(), selector.value.clone()))
350 .collect(),
351 ),
352 affinity: config.balancerd_affinity.clone(),
353 tolerations: config.balancerd_tolerations.clone(),
354 security_context: Some(PodSecurityContext {
355 fs_group: Some(999),
356 run_as_user: Some(999),
357 run_as_group: Some(999),
358 ..Default::default()
359 }),
360 scheduler_name: config.scheduler_name.clone(),
361 service_account_name: Some(mz.service_account_name()),
362 volumes: Some(volumes),
363 ..Default::default()
364 }),
365 },
366 ..Default::default()
367 };
368
369 Deployment {
370 metadata: ObjectMeta {
371 ..mz.managed_resource_meta(mz.balancerd_deployment_name())
372 },
373 spec: Some(deployment_spec),
374 status: None,
375 }
376}
377
378fn create_balancerd_service_object(
379 config: &super::MaterializeControllerArgs,
380 mz: &Materialize,
381) -> Service {
382 let selector =
383 btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
384
385 let ports = vec![
386 ServicePort {
387 name: Some("http".to_string()),
388 protocol: Some("TCP".to_string()),
389 port: config.balancerd_http_port.into(),
390 target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
391 ..Default::default()
392 },
393 ServicePort {
394 name: Some("pgwire".to_string()),
395 protocol: Some("TCP".to_string()),
396 port: config.balancerd_sql_port.into(),
397 target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
398 ..Default::default()
399 },
400 ];
401
402 let spec = ServiceSpec {
403 type_: Some("ClusterIP".to_string()),
404 cluster_ip: Some("None".to_string()),
405 selector: Some(selector),
406 ports: Some(ports),
407 ..Default::default()
408 };
409
410 Service {
411 metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
412 spec: Some(spec),
413 status: None,
414 }
415}