1use anyhow::bail;
11use k8s_controller::TraceMetadata;
12use k8s_openapi::{
13 api::{
14 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
15 core::v1::{
16 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
17 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
18 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
19 Volume, VolumeMount,
20 },
21 },
22 apimachinery::pkg::{
23 apis::meta::v1::{Condition, LabelSelector, Time},
24 util::intstr::IntOrString,
25 },
26 jiff::Timestamp,
27};
28use kube::{
29 Api, Client, Resource, ResourceExt,
30 api::{DeleteParams, ObjectMeta, PostParams},
31 runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
32};
33use maplit::btreemap;
34use tracing::{trace, warn};
35
36use crate::{
37 Error,
38 k8s::{apply_resource, get_resource, replace_resource},
39 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
40};
41use mz_cloud_resources::crd::{
42 ManagedResource,
43 balancer::v1alpha1::{Balancer, Routing},
44 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
45};
46use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
47use mz_ore::{cli::KeyValueArg, instrument};
48
49pub struct Config {
50 pub enable_security_context: bool,
51 pub enable_prometheus_scrape_annotations: bool,
52
53 pub image_pull_policy: KubernetesImagePullPolicy,
54 pub scheduler_name: Option<String>,
55 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
56 pub balancerd_affinity: Option<Affinity>,
57 pub balancerd_tolerations: Option<Vec<Toleration>>,
58 pub balancerd_default_resources: Option<ResourceRequirements>,
59
60 pub default_certificate_specs: DefaultCertificateSpecs,
61
62 pub environmentd_sql_port: u16,
63 pub environmentd_http_port: u16,
64 pub balancerd_sql_port: u16,
65 pub balancerd_http_port: u16,
66 pub balancerd_internal_http_port: u16,
67}
68
69pub struct Context {
70 config: Config,
71}
72
73impl Context {
74 pub fn new(config: Config) -> Self {
75 Self { config }
76 }
77
78 async fn sync_deployment_status(
79 &self,
80 client: &Client,
81 balancer: &Balancer,
82 ) -> Result<(), Error> {
83 let namespace = balancer.namespace();
84 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
85 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
86
87 let Some(deployment) = get_resource(&deployment_api, &balancer.deployment_name()).await?
88 else {
89 return Ok(());
90 };
91
92 let Some(deployment_conditions) = &deployment
93 .status
94 .as_ref()
95 .and_then(|status| status.conditions.as_ref())
96 else {
97 return Ok(());
100 };
101
102 let ready = deployment_conditions
103 .iter()
104 .any(|condition| condition.type_ == "Available" && condition.status == "True");
105 let ready_str = if ready { "True" } else { "False" };
106
107 let mut status = balancer.status.clone().unwrap();
108 if status
109 .conditions
110 .iter()
111 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
112 {
113 return Ok(());
117 }
118
119 status.conditions = vec![Condition {
120 type_: "Ready".to_string(),
121 status: ready_str.to_string(),
122 last_transition_time: Time(Timestamp::now()),
123 message: format!(
124 "balancerd deployment is{} ready",
125 if ready { "" } else { " not" }
126 ),
127 observed_generation: None,
128 reason: "DeploymentStatus".to_string(),
129 }];
130 let mut new_balancer = balancer.clone();
131 new_balancer.status = Some(status);
132
133 balancer_api
134 .replace_status(
135 &balancer.name_unchecked(),
136 &PostParams::default(),
137 &new_balancer,
138 )
139 .await?;
140
141 Ok(())
142 }
143
144 fn create_external_certificate_object(
145 &self,
146 balancer: &Balancer,
147 ) -> Result<Option<Certificate>, anyhow::Error> {
148 create_certificate(
149 self.config
150 .default_certificate_specs
151 .balancerd_external
152 .clone(),
153 balancer,
154 balancer.spec.external_certificate_spec.clone(),
155 balancer.external_certificate_name(),
156 balancer.external_certificate_secret_name(),
157 None,
158 CertificatePrivateKeyAlgorithm::Ecdsa,
159 Some(256),
160 )
161 }
162
163 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
164 let security_context = if self.config.enable_security_context {
165 Some(SecurityContext {
169 run_as_non_root: Some(true),
170 capabilities: Some(Capabilities {
171 drop: Some(vec!["ALL".to_string()]),
172 ..Default::default()
173 }),
174 seccomp_profile: Some(SeccompProfile {
175 type_: "RuntimeDefault".to_string(),
176 ..Default::default()
177 }),
178 allow_privilege_escalation: Some(false),
179 ..Default::default()
180 })
181 } else {
182 None
183 };
184
185 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
186 Some(btreemap! {
187 "prometheus.io/scrape".to_owned() => "true".to_string(),
188 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
189 "prometheus.io/path".to_owned() => "/metrics".to_string(),
190 "prometheus.io/scheme".to_owned() => "http".to_string(),
191 })
192 } else {
193 None
194 };
195 let mut pod_template_labels = balancer.default_labels();
196 pod_template_labels.insert(
197 "materialize.cloud/name".to_owned(),
198 balancer.deployment_name(),
199 );
200 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
201 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
202
203 let ports = vec![
204 ContainerPort {
205 container_port: self.config.balancerd_sql_port.into(),
206 name: Some("pgwire".into()),
207 protocol: Some("TCP".into()),
208 ..Default::default()
209 },
210 ContainerPort {
211 container_port: self.config.balancerd_http_port.into(),
212 name: Some("http".into()),
213 protocol: Some("TCP".into()),
214 ..Default::default()
215 },
216 ContainerPort {
217 container_port: self.config.balancerd_internal_http_port.into(),
218 name: Some("internal-http".into()),
219 protocol: Some("TCP".into()),
220 ..Default::default()
221 },
222 ];
223
224 let mut args = vec![
225 "service".to_string(),
226 format!(
227 "--pgwire-listen-addr=0.0.0.0:{}",
228 self.config.balancerd_sql_port
229 ),
230 format!(
231 "--https-listen-addr=0.0.0.0:{}",
232 self.config.balancerd_http_port
233 ),
234 format!(
235 "--internal-http-listen-addr=0.0.0.0:{}",
236 self.config.balancerd_internal_http_port
237 ),
238 ];
239 match balancer.routing()? {
240 Routing::Static(static_routing_config) => {
241 args.extend([
242 format!(
243 "--https-resolver-template={}.{}.svc.cluster.local:{}",
244 static_routing_config.environmentd_service_name,
245 static_routing_config.environmentd_namespace,
246 self.config.environmentd_http_port
247 ),
248 format!(
249 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
250 static_routing_config.environmentd_service_name,
251 static_routing_config.environmentd_namespace,
252 self.config.environmentd_sql_port
253 ),
254 ]);
255 }
256 Routing::Frontegg(_frontegg_routing_config) => {
257 bail!("frontegg routing is not yet implemented");
258 }
259 }
260
261 if issuer_ref_defined(
262 &self.config.default_certificate_specs.internal,
263 &balancer.spec.internal_certificate_spec,
264 ) {
265 args.push("--internal-tls".to_owned())
266 }
267
268 let mut volumes = Vec::new();
269 let mut volume_mounts = Vec::new();
270 if issuer_ref_defined(
271 &self.config.default_certificate_specs.balancerd_external,
272 &balancer.spec.external_certificate_spec,
273 ) {
274 volumes.push(Volume {
275 name: "external-certificate".to_owned(),
276 secret: Some(SecretVolumeSource {
277 default_mode: Some(0o400),
278 secret_name: Some(balancer.external_certificate_secret_name()),
279 items: None,
280 optional: Some(false),
281 }),
282 ..Default::default()
283 });
284 volume_mounts.push(VolumeMount {
285 name: "external-certificate".to_owned(),
286 mount_path: "/etc/external_tls".to_owned(),
287 read_only: Some(true),
288 ..Default::default()
289 });
290 args.extend([
291 "--tls-mode=require".into(),
292 "--tls-cert=/etc/external_tls/tls.crt".into(),
293 "--tls-key=/etc/external_tls/tls.key".into(),
294 ]);
295 } else {
296 args.push("--tls-mode=disable".to_string());
297 }
298
299 let startup_probe = Probe {
300 http_get: Some(HTTPGetAction {
301 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
302 path: Some("/api/readyz".into()),
303 ..Default::default()
304 }),
305 failure_threshold: Some(20),
306 initial_delay_seconds: Some(3),
307 period_seconds: Some(3),
308 success_threshold: Some(1),
309 timeout_seconds: Some(1),
310 ..Default::default()
311 };
312 let readiness_probe = Probe {
313 http_get: Some(HTTPGetAction {
314 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
315 path: Some("/api/readyz".into()),
316 ..Default::default()
317 }),
318 failure_threshold: Some(3),
319 period_seconds: Some(10),
320 success_threshold: Some(1),
321 timeout_seconds: Some(1),
322 ..Default::default()
323 };
324 let liveness_probe = Probe {
325 http_get: Some(HTTPGetAction {
326 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
327 path: Some("/api/livez".into()),
328 ..Default::default()
329 }),
330 failure_threshold: Some(3),
331 initial_delay_seconds: Some(8),
332 period_seconds: Some(10),
333 success_threshold: Some(1),
334 timeout_seconds: Some(1),
335 ..Default::default()
336 };
337
338 let container = Container {
339 name: "balancerd".to_owned(),
340 image: Some(balancer.spec.balancerd_image_ref.clone()),
341 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
342 ports: Some(ports),
343 args: Some(args),
344 startup_probe: Some(startup_probe),
345 readiness_probe: Some(readiness_probe),
346 liveness_probe: Some(liveness_probe),
347 resources: balancer
348 .spec
349 .resource_requirements
350 .clone()
351 .or_else(|| self.config.balancerd_default_resources.clone()),
352 security_context: security_context.clone(),
353 volume_mounts: Some(volume_mounts),
354 ..Default::default()
355 };
356
357 let deployment_spec = DeploymentSpec {
358 replicas: Some(balancer.replicas()),
359 selector: LabelSelector {
360 match_labels: Some(pod_template_labels.clone()),
361 ..Default::default()
362 },
363 strategy: Some(DeploymentStrategy {
364 rolling_update: Some(RollingUpdateDeployment {
365 max_surge: Some(IntOrString::String("100%".into())),
369 ..Default::default()
370 }),
371 ..Default::default()
372 }),
373 template: PodTemplateSpec {
374 metadata: Some(ObjectMeta {
377 annotations: pod_template_annotations,
378 labels: Some(pod_template_labels),
379 ..Default::default()
380 }),
381 spec: Some(PodSpec {
382 containers: vec![container],
383 node_selector: Some(
384 self.config
385 .balancerd_node_selector
386 .iter()
387 .map(|selector| (selector.key.clone(), selector.value.clone()))
388 .collect(),
389 ),
390 affinity: self.config.balancerd_affinity.clone(),
391 tolerations: self.config.balancerd_tolerations.clone(),
392 security_context: Some(PodSecurityContext {
393 fs_group: Some(999),
394 run_as_user: Some(999),
395 run_as_group: Some(999),
396 ..Default::default()
397 }),
398 scheduler_name: self.config.scheduler_name.clone(),
399 volumes: Some(volumes),
400 ..Default::default()
401 }),
402 },
403 ..Default::default()
404 };
405
406 Ok(Deployment {
407 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
408 spec: Some(deployment_spec),
409 status: None,
410 })
411 }
412
413 fn create_service_object(&self, balancer: &Balancer) -> Service {
414 let selector =
415 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
416
417 let ports = vec![
418 ServicePort {
419 name: Some("http".to_string()),
420 protocol: Some("TCP".to_string()),
421 port: self.config.balancerd_http_port.into(),
422 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
423 ..Default::default()
424 },
425 ServicePort {
426 name: Some("pgwire".to_string()),
427 protocol: Some("TCP".to_string()),
428 port: self.config.balancerd_sql_port.into(),
429 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
430 ..Default::default()
431 },
432 ];
433
434 let spec = ServiceSpec {
435 type_: Some("ClusterIP".to_string()),
436 cluster_ip: Some("None".to_string()),
437 selector: Some(selector),
438 ports: Some(ports),
439 ..Default::default()
440 };
441
442 Service {
443 metadata: balancer.managed_resource_meta(balancer.service_name()),
444 spec: Some(spec),
445 status: None,
446 }
447 }
448
449 async fn fix_deployment(
452 &self,
453 deployment_api: &Api<Deployment>,
454 new_deployment: &Deployment,
455 ) -> Result<(), Error> {
456 let Some(mut existing_deployment) =
457 get_resource(deployment_api, &new_deployment.name_unchecked()).await?
458 else {
459 return Ok(());
460 };
461
462 if existing_deployment.spec.as_ref().unwrap().selector
463 == new_deployment.spec.as_ref().unwrap().selector
464 {
465 return Ok(());
466 }
467
468 warn!("found existing deployment with old label selector, fixing");
469
470 existing_deployment
473 .spec
474 .as_mut()
475 .unwrap()
476 .template
477 .metadata
478 .as_mut()
479 .unwrap()
480 .labels = new_deployment
481 .spec
482 .as_ref()
483 .unwrap()
484 .template
485 .metadata
486 .as_ref()
487 .unwrap()
488 .labels
489 .clone();
490
491 replace_resource(deployment_api, &existing_deployment).await?;
495 await_condition(
496 deployment_api.clone(),
497 &existing_deployment.name_unchecked(),
498 |deployment: Option<&Deployment>| {
499 let observed_generation = deployment
500 .and_then(|deployment| deployment.status.as_ref())
501 .and_then(|status| status.observed_generation)
502 .unwrap_or(0);
503 let current_generation = deployment
504 .and_then(|deployment| deployment.meta().generation)
505 .unwrap_or(0);
506 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
507 observed_generation == current_generation
508 && current_generation > previous_generation
509 },
510 )
511 .await
512 .map_err(|e| anyhow::anyhow!(e))?;
513 await_condition(
514 deployment_api.clone(),
515 &existing_deployment.name_unchecked(),
516 is_deployment_completed(),
517 )
518 .await
519 .map_err(|e| anyhow::anyhow!(e))?;
520
521 match kube::runtime::wait::delete::delete_and_finalize(
524 deployment_api.clone(),
525 &existing_deployment.name_unchecked(),
526 &DeleteParams::orphan(),
527 )
528 .await
529 {
530 Ok(_) => {}
531 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
532 if e.code == 404 =>
533 {
534 }
536 Err(e) => return Err(anyhow::anyhow!(e).into()),
537 }
538
539 Ok(())
545 }
546}
547
548#[async_trait::async_trait]
549impl k8s_controller::Context for Context {
550 type Resource = Balancer;
551 type Error = Error;
552
553 #[instrument(fields())]
554 async fn apply(
555 &self,
556 client: Client,
557 balancer: &Self::Resource,
558 _metadata: &mut TraceMetadata,
559 ) -> Result<Option<Action>, Self::Error> {
560 if balancer.status.is_none() {
561 let balancer_api: Api<Balancer> =
562 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
563 let mut new_balancer = balancer.clone();
564 new_balancer.status = Some(balancer.status());
565 balancer_api
566 .replace_status(
567 &balancer.name_unchecked(),
568 &PostParams::default(),
569 &new_balancer,
570 )
571 .await?;
572 return Ok(None);
575 }
576
577 let namespace = balancer.namespace();
578 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
579 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
580 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
581
582 if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
583 trace!("creating new balancerd external certificate");
584 apply_resource(&certificate_api, &external_certificate).await?;
585 }
586
587 let deployment = self.create_deployment_object(balancer)?;
588 self.fix_deployment(&deployment_api, &deployment).await?;
589 trace!("creating new balancerd deployment");
590 apply_resource(&deployment_api, &deployment).await?;
591
592 let service = self.create_service_object(balancer);
593 trace!("creating new balancerd service");
594 apply_resource(&service_api, &service).await?;
595
596 self.sync_deployment_status(&client, balancer).await?;
597
598 Ok(None)
599 }
600}