1use anyhow::bail;
11use k8s_openapi::{
12 api::{
13 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
14 core::v1::{
15 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
16 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
17 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
18 Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{
22 apis::meta::v1::{Condition, LabelSelector, Time},
23 util::intstr::IntOrString,
24 },
25 jiff::Timestamp,
26};
27use kube::{
28 Api, Client, Resource, ResourceExt,
29 api::{DeleteParams, ObjectMeta, PostParams},
30 runtime::{conditions::is_deployment_completed, controller::Action, wait::await_condition},
31};
32use maplit::btreemap;
33use tracing::{trace, warn};
34
35use crate::{
36 Error,
37 k8s::{apply_resource, get_resource, replace_resource},
38 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
39};
40use mz_cloud_resources::crd::{
41 ManagedResource,
42 balancer::v1alpha1::{Balancer, Routing},
43 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
44};
45use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
46use mz_ore::{cli::KeyValueArg, instrument};
47
48pub struct Config {
49 pub enable_security_context: bool,
50 pub enable_prometheus_scrape_annotations: bool,
51
52 pub image_pull_policy: KubernetesImagePullPolicy,
53 pub scheduler_name: Option<String>,
54 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
55 pub balancerd_affinity: Option<Affinity>,
56 pub balancerd_tolerations: Option<Vec<Toleration>>,
57 pub balancerd_default_resources: Option<ResourceRequirements>,
58
59 pub default_certificate_specs: DefaultCertificateSpecs,
60
61 pub environmentd_sql_port: u16,
62 pub environmentd_http_port: u16,
63 pub balancerd_sql_port: u16,
64 pub balancerd_http_port: u16,
65 pub balancerd_internal_http_port: u16,
66}
67
68pub struct Context {
69 config: Config,
70}
71
72impl Context {
73 pub fn new(config: Config) -> Self {
74 Self { config }
75 }
76
77 async fn sync_deployment_status(
78 &self,
79 client: &Client,
80 balancer: &Balancer,
81 ) -> Result<(), Error> {
82 let namespace = balancer.namespace();
83 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
84 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
85
86 let Some(deployment) = get_resource(&deployment_api, &balancer.deployment_name()).await?
87 else {
88 return Ok(());
89 };
90
91 let Some(deployment_conditions) = &deployment
92 .status
93 .as_ref()
94 .and_then(|status| status.conditions.as_ref())
95 else {
96 return Ok(());
99 };
100
101 let ready = deployment_conditions
102 .iter()
103 .any(|condition| condition.type_ == "Available" && condition.status == "True");
104 let ready_str = if ready { "True" } else { "False" };
105
106 let mut status = balancer.status.clone().unwrap();
107 if status
108 .conditions
109 .iter()
110 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
111 {
112 return Ok(());
116 }
117
118 status.conditions = vec![Condition {
119 type_: "Ready".to_string(),
120 status: ready_str.to_string(),
121 last_transition_time: Time(Timestamp::now()),
122 message: format!(
123 "balancerd deployment is{} ready",
124 if ready { "" } else { " not" }
125 ),
126 observed_generation: None,
127 reason: "DeploymentStatus".to_string(),
128 }];
129 let mut new_balancer = balancer.clone();
130 new_balancer.status = Some(status);
131
132 balancer_api
133 .replace_status(
134 &balancer.name_unchecked(),
135 &PostParams::default(),
136 &new_balancer,
137 )
138 .await?;
139
140 Ok(())
141 }
142
143 fn create_external_certificate_object(
144 &self,
145 balancer: &Balancer,
146 ) -> Result<Option<Certificate>, anyhow::Error> {
147 create_certificate(
148 self.config
149 .default_certificate_specs
150 .balancerd_external
151 .clone(),
152 balancer,
153 balancer.spec.external_certificate_spec.clone(),
154 balancer.external_certificate_name(),
155 balancer.external_certificate_secret_name(),
156 None,
157 CertificatePrivateKeyAlgorithm::Ecdsa,
158 Some(256),
159 )
160 }
161
162 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
163 let security_context = if self.config.enable_security_context {
164 Some(SecurityContext {
168 run_as_non_root: Some(true),
169 capabilities: Some(Capabilities {
170 drop: Some(vec!["ALL".to_string()]),
171 ..Default::default()
172 }),
173 seccomp_profile: Some(SeccompProfile {
174 type_: "RuntimeDefault".to_string(),
175 ..Default::default()
176 }),
177 allow_privilege_escalation: Some(false),
178 ..Default::default()
179 })
180 } else {
181 None
182 };
183
184 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
185 Some(btreemap! {
186 "prometheus.io/scrape".to_owned() => "true".to_string(),
187 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
188 "prometheus.io/path".to_owned() => "/metrics".to_string(),
189 "prometheus.io/scheme".to_owned() => "http".to_string(),
190 })
191 } else {
192 None
193 };
194 let mut pod_template_labels = balancer.default_labels();
195 pod_template_labels.insert(
196 "materialize.cloud/name".to_owned(),
197 balancer.deployment_name(),
198 );
199 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
200 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
201
202 let ports = vec![
203 ContainerPort {
204 container_port: self.config.balancerd_sql_port.into(),
205 name: Some("pgwire".into()),
206 protocol: Some("TCP".into()),
207 ..Default::default()
208 },
209 ContainerPort {
210 container_port: self.config.balancerd_http_port.into(),
211 name: Some("http".into()),
212 protocol: Some("TCP".into()),
213 ..Default::default()
214 },
215 ContainerPort {
216 container_port: self.config.balancerd_internal_http_port.into(),
217 name: Some("internal-http".into()),
218 protocol: Some("TCP".into()),
219 ..Default::default()
220 },
221 ];
222
223 let mut args = vec![
224 "service".to_string(),
225 format!(
226 "--pgwire-listen-addr=0.0.0.0:{}",
227 self.config.balancerd_sql_port
228 ),
229 format!(
230 "--https-listen-addr=0.0.0.0:{}",
231 self.config.balancerd_http_port
232 ),
233 format!(
234 "--internal-http-listen-addr=0.0.0.0:{}",
235 self.config.balancerd_internal_http_port
236 ),
237 ];
238 match balancer.routing()? {
239 Routing::Static(static_routing_config) => {
240 args.extend([
241 format!(
242 "--https-resolver-template={}.{}.svc.cluster.local:{}",
243 static_routing_config.environmentd_service_name,
244 static_routing_config.environmentd_namespace,
245 self.config.environmentd_http_port
246 ),
247 format!(
248 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
249 static_routing_config.environmentd_service_name,
250 static_routing_config.environmentd_namespace,
251 self.config.environmentd_sql_port
252 ),
253 ]);
254 }
255 Routing::Frontegg(_frontegg_routing_config) => {
256 bail!("frontegg routing is not yet implemented");
257 }
258 }
259
260 if issuer_ref_defined(
261 &self.config.default_certificate_specs.internal,
262 &balancer.spec.internal_certificate_spec,
263 ) {
264 args.push("--internal-tls".to_owned())
265 }
266
267 let mut volumes = Vec::new();
268 let mut volume_mounts = Vec::new();
269 if issuer_ref_defined(
270 &self.config.default_certificate_specs.balancerd_external,
271 &balancer.spec.external_certificate_spec,
272 ) {
273 volumes.push(Volume {
274 name: "external-certificate".to_owned(),
275 secret: Some(SecretVolumeSource {
276 default_mode: Some(0o400),
277 secret_name: Some(balancer.external_certificate_secret_name()),
278 items: None,
279 optional: Some(false),
280 }),
281 ..Default::default()
282 });
283 volume_mounts.push(VolumeMount {
284 name: "external-certificate".to_owned(),
285 mount_path: "/etc/external_tls".to_owned(),
286 read_only: Some(true),
287 ..Default::default()
288 });
289 args.extend([
290 "--tls-mode=require".into(),
291 "--tls-cert=/etc/external_tls/tls.crt".into(),
292 "--tls-key=/etc/external_tls/tls.key".into(),
293 ]);
294 } else {
295 args.push("--tls-mode=disable".to_string());
296 }
297
298 let startup_probe = Probe {
299 http_get: Some(HTTPGetAction {
300 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
301 path: Some("/api/readyz".into()),
302 ..Default::default()
303 }),
304 failure_threshold: Some(20),
305 initial_delay_seconds: Some(3),
306 period_seconds: Some(3),
307 success_threshold: Some(1),
308 timeout_seconds: Some(1),
309 ..Default::default()
310 };
311 let readiness_probe = Probe {
312 http_get: Some(HTTPGetAction {
313 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
314 path: Some("/api/readyz".into()),
315 ..Default::default()
316 }),
317 failure_threshold: Some(3),
318 period_seconds: Some(10),
319 success_threshold: Some(1),
320 timeout_seconds: Some(1),
321 ..Default::default()
322 };
323 let liveness_probe = Probe {
324 http_get: Some(HTTPGetAction {
325 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
326 path: Some("/api/livez".into()),
327 ..Default::default()
328 }),
329 failure_threshold: Some(3),
330 initial_delay_seconds: Some(8),
331 period_seconds: Some(10),
332 success_threshold: Some(1),
333 timeout_seconds: Some(1),
334 ..Default::default()
335 };
336
337 let container = Container {
338 name: "balancerd".to_owned(),
339 image: Some(balancer.spec.balancerd_image_ref.clone()),
340 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
341 ports: Some(ports),
342 args: Some(args),
343 startup_probe: Some(startup_probe),
344 readiness_probe: Some(readiness_probe),
345 liveness_probe: Some(liveness_probe),
346 resources: balancer
347 .spec
348 .resource_requirements
349 .clone()
350 .or_else(|| self.config.balancerd_default_resources.clone()),
351 security_context: security_context.clone(),
352 volume_mounts: Some(volume_mounts),
353 ..Default::default()
354 };
355
356 let deployment_spec = DeploymentSpec {
357 replicas: Some(balancer.replicas()),
358 selector: LabelSelector {
359 match_labels: Some(pod_template_labels.clone()),
360 ..Default::default()
361 },
362 strategy: Some(DeploymentStrategy {
363 rolling_update: Some(RollingUpdateDeployment {
364 max_surge: Some(IntOrString::String("100%".into())),
368 ..Default::default()
369 }),
370 ..Default::default()
371 }),
372 template: PodTemplateSpec {
373 metadata: Some(ObjectMeta {
376 annotations: pod_template_annotations,
377 labels: Some(pod_template_labels),
378 ..Default::default()
379 }),
380 spec: Some(PodSpec {
381 containers: vec![container],
382 node_selector: Some(
383 self.config
384 .balancerd_node_selector
385 .iter()
386 .map(|selector| (selector.key.clone(), selector.value.clone()))
387 .collect(),
388 ),
389 affinity: self.config.balancerd_affinity.clone(),
390 tolerations: self.config.balancerd_tolerations.clone(),
391 security_context: Some(PodSecurityContext {
392 fs_group: Some(999),
393 run_as_user: Some(999),
394 run_as_group: Some(999),
395 ..Default::default()
396 }),
397 scheduler_name: self.config.scheduler_name.clone(),
398 volumes: Some(volumes),
399 ..Default::default()
400 }),
401 },
402 ..Default::default()
403 };
404
405 Ok(Deployment {
406 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
407 spec: Some(deployment_spec),
408 status: None,
409 })
410 }
411
412 fn create_service_object(&self, balancer: &Balancer) -> Service {
413 let selector =
414 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
415
416 let ports = vec![
417 ServicePort {
418 name: Some("http".to_string()),
419 protocol: Some("TCP".to_string()),
420 port: self.config.balancerd_http_port.into(),
421 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
422 ..Default::default()
423 },
424 ServicePort {
425 name: Some("pgwire".to_string()),
426 protocol: Some("TCP".to_string()),
427 port: self.config.balancerd_sql_port.into(),
428 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
429 ..Default::default()
430 },
431 ];
432
433 let spec = ServiceSpec {
434 type_: Some("ClusterIP".to_string()),
435 cluster_ip: Some("None".to_string()),
436 selector: Some(selector),
437 ports: Some(ports),
438 ..Default::default()
439 };
440
441 Service {
442 metadata: balancer.managed_resource_meta(balancer.service_name()),
443 spec: Some(spec),
444 status: None,
445 }
446 }
447
448 async fn fix_deployment(
451 &self,
452 deployment_api: &Api<Deployment>,
453 new_deployment: &Deployment,
454 ) -> Result<(), Error> {
455 let Some(mut existing_deployment) =
456 get_resource(deployment_api, &new_deployment.name_unchecked()).await?
457 else {
458 return Ok(());
459 };
460
461 if existing_deployment.spec.as_ref().unwrap().selector
462 == new_deployment.spec.as_ref().unwrap().selector
463 {
464 return Ok(());
465 }
466
467 warn!("found existing deployment with old label selector, fixing");
468
469 existing_deployment
472 .spec
473 .as_mut()
474 .unwrap()
475 .template
476 .metadata
477 .as_mut()
478 .unwrap()
479 .labels = new_deployment
480 .spec
481 .as_ref()
482 .unwrap()
483 .template
484 .metadata
485 .as_ref()
486 .unwrap()
487 .labels
488 .clone();
489
490 replace_resource(deployment_api, &existing_deployment).await?;
494 await_condition(
495 deployment_api.clone(),
496 &existing_deployment.name_unchecked(),
497 |deployment: Option<&Deployment>| {
498 let observed_generation = deployment
499 .and_then(|deployment| deployment.status.as_ref())
500 .and_then(|status| status.observed_generation)
501 .unwrap_or(0);
502 let current_generation = deployment
503 .and_then(|deployment| deployment.meta().generation)
504 .unwrap_or(0);
505 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
506 observed_generation == current_generation
507 && current_generation > previous_generation
508 },
509 )
510 .await
511 .map_err(|e| anyhow::anyhow!(e))?;
512 await_condition(
513 deployment_api.clone(),
514 &existing_deployment.name_unchecked(),
515 is_deployment_completed(),
516 )
517 .await
518 .map_err(|e| anyhow::anyhow!(e))?;
519
520 match kube::runtime::wait::delete::delete_and_finalize(
523 deployment_api.clone(),
524 &existing_deployment.name_unchecked(),
525 &DeleteParams::orphan(),
526 )
527 .await
528 {
529 Ok(_) => {}
530 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
531 if e.code == 404 =>
532 {
533 }
535 Err(e) => return Err(anyhow::anyhow!(e).into()),
536 }
537
538 Ok(())
544 }
545}
546
547#[async_trait::async_trait]
548impl k8s_controller::Context for Context {
549 type Resource = Balancer;
550 type Error = Error;
551
552 #[instrument(fields())]
553 async fn apply(
554 &self,
555 client: Client,
556 balancer: &Self::Resource,
557 ) -> Result<Option<Action>, Self::Error> {
558 if balancer.status.is_none() {
559 let balancer_api: Api<Balancer> =
560 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
561 let mut new_balancer = balancer.clone();
562 new_balancer.status = Some(balancer.status());
563 balancer_api
564 .replace_status(
565 &balancer.name_unchecked(),
566 &PostParams::default(),
567 &new_balancer,
568 )
569 .await?;
570 return Ok(None);
573 }
574
575 let namespace = balancer.namespace();
576 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
577 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
578 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
579
580 if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
581 trace!("creating new balancerd external certificate");
582 apply_resource(&certificate_api, &external_certificate).await?;
583 }
584
585 let deployment = self.create_deployment_object(balancer)?;
586 self.fix_deployment(&deployment_api, &deployment).await?;
587 trace!("creating new balancerd deployment");
588 apply_resource(&deployment_api, &deployment).await?;
589
590 let service = self.create_service_object(balancer);
591 trace!("creating new balancerd service");
592 apply_resource(&service_api, &service).await?;
593
594 self.sync_deployment_status(&client, balancer).await?;
595
596 Ok(None)
597 }
598}