1use std::time::Duration;
11
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15 core::v1::{
16 Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext, PodSpec,
17 PodTemplateSpec, Probe, SeccompProfile, SecretVolumeSource, SecurityContext, Service,
18 ServicePort, ServiceSpec, Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
22};
23use kube::{Api, Client, ResourceExt, api::ObjectMeta, runtime::controller::Action};
24use maplit::btreemap;
25use tracing::trace;
26
27use crate::{
28 controller::materialize::{Error, matching_image_from_environmentd_image_ref},
29 k8s::{apply_resource, delete_resource, get_resource},
30 tls::{create_certificate, issuer_ref_defined},
31};
32use mz_cloud_resources::crd::{
33 ManagedResource,
34 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
35 materialize::v1alpha1::Materialize,
36};
37use mz_ore::instrument;
38
39pub struct Resources {
40 balancerd_external_certificate: Box<Option<Certificate>>,
41 balancerd_deployment: Box<Deployment>,
42 balancerd_service: Box<Service>,
43}
44
45impl Resources {
46 pub fn new(config: &super::Config, mz: &Materialize) -> Self {
47 let balancerd_external_certificate =
48 Box::new(create_balancerd_external_certificate(config, mz));
49 let balancerd_deployment = Box::new(create_balancerd_deployment_object(config, mz));
50 let balancerd_service = Box::new(create_balancerd_service_object(config, mz));
51
52 Self {
53 balancerd_external_certificate,
54 balancerd_deployment,
55 balancerd_service,
56 }
57 }
58
59 #[instrument]
60 pub async fn apply(&self, client: &Client, namespace: &str) -> Result<Option<Action>, Error> {
61 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
62 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
63 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
64
65 if let Some(certificate) = &*self.balancerd_external_certificate {
66 trace!("creating new balancerd external certificate");
67 apply_resource(&certificate_api, certificate).await?;
68 }
69
70 trace!("creating new balancerd deployment");
71 apply_resource(&deployment_api, &*self.balancerd_deployment).await?;
72
73 trace!("creating new balancerd service");
74 apply_resource(&service_api, &*self.balancerd_service).await?;
75
76 if let Some(deployment) =
77 get_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?
78 {
79 for condition in deployment
80 .status
81 .as_ref()
82 .and_then(|status| status.conditions.as_deref())
83 .unwrap_or(&[])
84 {
85 if condition.type_ == "Available" && condition.status == "True" {
86 return Ok(None);
87 }
88 }
89 }
90
91 Ok(Some(Action::requeue(Duration::from_secs(1))))
92 }
93
94 #[instrument]
95 pub async fn cleanup(
96 &self,
97 client: &Client,
98 namespace: &str,
99 ) -> Result<Option<Action>, anyhow::Error> {
100 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), namespace);
101 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
102 let service_api: Api<Service> = Api::namespaced(client.clone(), namespace);
103
104 trace!("deleting balancerd service");
105 delete_resource(&service_api, &self.balancerd_service.name_unchecked()).await?;
106
107 trace!("deleting balancerd deployment");
108 delete_resource(&deployment_api, &self.balancerd_deployment.name_unchecked()).await?;
109
110 if let Some(certificate) = &*self.balancerd_external_certificate {
111 trace!("deleting balancerd external certificate");
112 delete_resource(&certificate_api, &certificate.name_unchecked()).await?;
113 }
114
115 Ok(None)
116 }
117}
118
119fn create_balancerd_external_certificate(
120 config: &super::Config,
121 mz: &Materialize,
122) -> Option<Certificate> {
123 create_certificate(
124 config.default_certificate_specs.balancerd_external.clone(),
125 mz,
126 mz.spec.balancerd_external_certificate_spec.clone(),
127 mz.balancerd_external_certificate_name(),
128 mz.balancerd_external_certificate_secret_name(),
129 None,
130 CertificatePrivateKeyAlgorithm::Rsa,
131 Some(4096),
132 )
133}
134
135fn create_balancerd_deployment_object(config: &super::Config, mz: &Materialize) -> Deployment {
136 let security_context = if config.enable_security_context {
137 Some(SecurityContext {
141 run_as_non_root: Some(true),
142 capabilities: Some(Capabilities {
143 drop: Some(vec!["ALL".to_string()]),
144 ..Default::default()
145 }),
146 seccomp_profile: Some(SeccompProfile {
147 type_: "RuntimeDefault".to_string(),
148 ..Default::default()
149 }),
150 allow_privilege_escalation: Some(false),
151 ..Default::default()
152 })
153 } else {
154 None
155 };
156
157 let pod_template_annotations = if config.enable_prometheus_scrape_annotations {
158 Some(btreemap! {
159 "prometheus.io/scrape".to_owned() => "true".to_string(),
160 "prometheus.io/port".to_owned() => config.balancerd_internal_http_port.to_string(),
161 "prometheus.io/path".to_owned() => "/metrics".to_string(),
162 "prometheus.io/scheme".to_owned() => "http".to_string(),
163 })
164 } else {
165 None
166 };
167 let mut pod_template_labels = mz.default_labels();
168 pod_template_labels.insert(
169 "materialize.cloud/name".to_owned(),
170 mz.balancerd_deployment_name(),
171 );
172 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
173 pod_template_labels.insert("materialize.cloud/app".to_owned(), mz.balancerd_app_name());
174
175 let ports = vec![
176 ContainerPort {
177 container_port: config.balancerd_sql_port.into(),
178 name: Some("pgwire".into()),
179 protocol: Some("TCP".into()),
180 ..Default::default()
181 },
182 ContainerPort {
183 container_port: config.balancerd_http_port.into(),
184 name: Some("http".into()),
185 protocol: Some("TCP".into()),
186 ..Default::default()
187 },
188 ContainerPort {
189 container_port: config.balancerd_internal_http_port.into(),
190 name: Some("internal-http".into()),
191 protocol: Some("TCP".into()),
192 ..Default::default()
193 },
194 ];
195
196 let mut args = vec![
197 "service".to_string(),
198 format!("--pgwire-listen-addr=0.0.0.0:{}", config.balancerd_sql_port),
199 format!("--https-listen-addr=0.0.0.0:{}", config.balancerd_http_port),
200 format!(
201 "--internal-http-listen-addr=0.0.0.0:{}",
202 config.balancerd_internal_http_port
203 ),
204 format!(
205 "--https-resolver-template={}.{}.svc.cluster.local:{}",
206 mz.environmentd_service_name(),
207 mz.namespace(),
208 config.environmentd_http_port
209 ),
210 format!(
211 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
212 mz.environmentd_service_name(),
213 mz.namespace(),
214 config.environmentd_sql_port
215 ),
216 ];
217
218 if issuer_ref_defined(
219 &config.default_certificate_specs.internal,
220 &mz.spec.internal_certificate_spec,
221 ) {
222 args.push("--internal-tls".to_owned())
223 }
224
225 let mut volumes = Vec::new();
226 let mut volume_mounts = Vec::new();
227 if issuer_ref_defined(
228 &config.default_certificate_specs.balancerd_external,
229 &mz.spec.balancerd_external_certificate_spec,
230 ) {
231 volumes.push(Volume {
232 name: "external-certificate".to_owned(),
233 secret: Some(SecretVolumeSource {
234 default_mode: Some(0o400),
235 secret_name: Some(mz.balancerd_external_certificate_secret_name()),
236 items: None,
237 optional: Some(false),
238 }),
239 ..Default::default()
240 });
241 volume_mounts.push(VolumeMount {
242 name: "external-certificate".to_owned(),
243 mount_path: "/etc/external_tls".to_owned(),
244 read_only: Some(true),
245 ..Default::default()
246 });
247 args.extend([
248 "--tls-mode=require".into(),
249 "--tls-cert=/etc/external_tls/tls.crt".into(),
250 "--tls-key=/etc/external_tls/tls.key".into(),
251 ]);
252 } else {
253 args.push("--tls-mode=disable".to_string());
254 }
255
256 let startup_probe = Probe {
257 http_get: Some(HTTPGetAction {
258 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
259 path: Some("/api/readyz".into()),
260 ..Default::default()
261 }),
262 failure_threshold: Some(20),
263 initial_delay_seconds: Some(3),
264 period_seconds: Some(3),
265 success_threshold: Some(1),
266 timeout_seconds: Some(1),
267 ..Default::default()
268 };
269 let readiness_probe = Probe {
270 http_get: Some(HTTPGetAction {
271 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
272 path: Some("/api/readyz".into()),
273 ..Default::default()
274 }),
275 failure_threshold: Some(3),
276 period_seconds: Some(10),
277 success_threshold: Some(1),
278 timeout_seconds: Some(1),
279 ..Default::default()
280 };
281 let liveness_probe = Probe {
282 http_get: Some(HTTPGetAction {
283 port: IntOrString::Int(config.balancerd_internal_http_port.into()),
284 path: Some("/api/livez".into()),
285 ..Default::default()
286 }),
287 failure_threshold: Some(3),
288 initial_delay_seconds: Some(8),
289 period_seconds: Some(10),
290 success_threshold: Some(1),
291 timeout_seconds: Some(1),
292 ..Default::default()
293 };
294
295 let container = Container {
296 name: "balancerd".to_owned(),
297 image: Some(matching_image_from_environmentd_image_ref(
298 &mz.spec.environmentd_image_ref,
299 "balancerd",
300 None,
301 )),
302 image_pull_policy: Some(config.image_pull_policy.to_string()),
303 ports: Some(ports),
304 args: Some(args),
305 startup_probe: Some(startup_probe),
306 readiness_probe: Some(readiness_probe),
307 liveness_probe: Some(liveness_probe),
308 resources: mz
309 .spec
310 .balancerd_resource_requirements
311 .clone()
312 .or_else(|| config.balancerd_default_resources.clone()),
313 security_context: security_context.clone(),
314 volume_mounts: Some(volume_mounts),
315 ..Default::default()
316 };
317
318 let deployment_spec = DeploymentSpec {
319 replicas: Some(mz.balancerd_replicas()),
320 selector: LabelSelector {
321 match_labels: Some(pod_template_labels.clone()),
322 ..Default::default()
323 },
324 strategy: Some(DeploymentStrategy {
325 rolling_update: Some(RollingUpdateDeployment {
326 max_surge: Some(IntOrString::String("100%".into())),
330 ..Default::default()
331 }),
332 ..Default::default()
333 }),
334 template: PodTemplateSpec {
335 metadata: Some(ObjectMeta {
338 annotations: pod_template_annotations,
339 labels: Some(pod_template_labels),
340 ..Default::default()
341 }),
342 spec: Some(PodSpec {
343 containers: vec![container],
344 node_selector: Some(
345 config
346 .balancerd_node_selector
347 .iter()
348 .map(|selector| (selector.key.clone(), selector.value.clone()))
349 .collect(),
350 ),
351 affinity: config.balancerd_affinity.clone(),
352 tolerations: config.balancerd_tolerations.clone(),
353 security_context: Some(PodSecurityContext {
354 fs_group: Some(999),
355 run_as_user: Some(999),
356 run_as_group: Some(999),
357 ..Default::default()
358 }),
359 scheduler_name: config.scheduler_name.clone(),
360 service_account_name: Some(mz.service_account_name()),
361 volumes: Some(volumes),
362 ..Default::default()
363 }),
364 },
365 ..Default::default()
366 };
367
368 Deployment {
369 metadata: ObjectMeta {
370 ..mz.managed_resource_meta(mz.balancerd_deployment_name())
371 },
372 spec: Some(deployment_spec),
373 status: None,
374 }
375}
376
377fn create_balancerd_service_object(config: &super::Config, mz: &Materialize) -> Service {
378 let selector =
379 btreemap! {"materialize.cloud/name".to_string() => mz.balancerd_deployment_name()};
380
381 let ports = vec![
382 ServicePort {
383 name: Some("http".to_string()),
384 protocol: Some("TCP".to_string()),
385 port: config.balancerd_http_port.into(),
386 target_port: Some(IntOrString::Int(config.balancerd_http_port.into())),
387 ..Default::default()
388 },
389 ServicePort {
390 name: Some("pgwire".to_string()),
391 protocol: Some("TCP".to_string()),
392 port: config.balancerd_sql_port.into(),
393 target_port: Some(IntOrString::Int(config.balancerd_sql_port.into())),
394 ..Default::default()
395 },
396 ];
397
398 let spec = ServiceSpec {
399 type_: Some("ClusterIP".to_string()),
400 cluster_ip: Some("None".to_string()),
401 selector: Some(selector),
402 ports: Some(ports),
403 ..Default::default()
404 };
405
406 Service {
407 metadata: mz.managed_resource_meta(mz.balancerd_service_name()),
408 spec: Some(spec),
409 status: None,
410 }
411}