1use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14 api::{
15 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16 core::v1::{
17 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20 Volume, VolumeMount,
21 },
22 },
23 apimachinery::pkg::{
24 apis::meta::v1::{Condition, LabelSelector, Time},
25 util::intstr::IntOrString,
26 },
27 jiff::Timestamp,
28};
29use kube::{
30 Api, Client, Resource, ResourceExt,
31 api::{DeleteParams, ObjectMeta, PostParams},
32 runtime::{
33 conditions::is_deployment_completed,
34 controller::Action,
35 reflector::{ObjectRef, Store},
36 wait::await_condition,
37 },
38};
39use maplit::btreemap;
40use tracing::{trace, warn};
41
42use crate::{
43 Error,
44 k8s::{apply_resource, make_reflector, replace_resource},
45 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
46};
47use mz_cloud_resources::crd::{
48 ManagedResource,
49 balancer::v1alpha1::{Balancer, Routing},
50 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
51};
52use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
53use mz_ore::{cli::KeyValueArg, instrument};
54
55pub struct Config {
56 pub enable_security_context: bool,
57 pub enable_prometheus_scrape_annotations: bool,
58
59 pub image_pull_policy: KubernetesImagePullPolicy,
60 pub scheduler_name: Option<String>,
61 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
62 pub balancerd_affinity: Option<Affinity>,
63 pub balancerd_tolerations: Option<Vec<Toleration>>,
64 pub balancerd_default_resources: Option<ResourceRequirements>,
65
66 pub default_certificate_specs: DefaultCertificateSpecs,
67
68 pub environmentd_sql_port: u16,
69 pub environmentd_http_port: u16,
70 pub balancerd_sql_port: u16,
71 pub balancerd_http_port: u16,
72 pub balancerd_internal_http_port: u16,
73}
74
75pub struct Context {
76 config: Config,
77 deployments: Store<Deployment>,
78}
79
80impl Context {
81 pub async fn new(config: Config, client: Client) -> Self {
82 Self {
83 config,
84 deployments: make_reflector(client).await,
85 }
86 }
87
88 async fn sync_deployment_status(
89 &self,
90 client: &Client,
91 balancer: &Balancer,
92 ) -> Result<(), kube::Error> {
93 let namespace = balancer.namespace();
94 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
95
96 let Some(deployment) = self
97 .deployments
98 .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
99 else {
100 return Ok(());
101 };
102
103 let Some(deployment_conditions) = &deployment
104 .status
105 .as_ref()
106 .and_then(|status| status.conditions.as_ref())
107 else {
108 return Ok(());
111 };
112
113 let ready = deployment_conditions
114 .iter()
115 .any(|condition| condition.type_ == "Available" && condition.status == "True");
116 let ready_str = if ready { "True" } else { "False" };
117
118 let mut status = balancer.status.clone().unwrap();
119 if status
120 .conditions
121 .iter()
122 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
123 {
124 return Ok(());
128 }
129
130 status.conditions = vec![Condition {
131 type_: "Ready".to_string(),
132 status: ready_str.to_string(),
133 last_transition_time: Time(Timestamp::now()),
134 message: format!(
135 "balancerd deployment is{} ready",
136 if ready { "" } else { " not" }
137 ),
138 observed_generation: None,
139 reason: "DeploymentStatus".to_string(),
140 }];
141 let mut new_balancer = balancer.clone();
142 new_balancer.status = Some(status);
143
144 balancer_api
145 .replace_status(
146 &balancer.name_unchecked(),
147 &PostParams::default(),
148 &new_balancer,
149 )
150 .await?;
151
152 Ok(())
153 }
154
155 fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
156 create_certificate(
157 self.config
158 .default_certificate_specs
159 .balancerd_external
160 .clone(),
161 balancer,
162 balancer.spec.external_certificate_spec.clone(),
163 balancer.external_certificate_name(),
164 balancer.external_certificate_secret_name(),
165 None,
166 CertificatePrivateKeyAlgorithm::Rsa,
167 Some(4096),
168 )
169 }
170
171 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
172 let security_context = if self.config.enable_security_context {
173 Some(SecurityContext {
177 run_as_non_root: Some(true),
178 capabilities: Some(Capabilities {
179 drop: Some(vec!["ALL".to_string()]),
180 ..Default::default()
181 }),
182 seccomp_profile: Some(SeccompProfile {
183 type_: "RuntimeDefault".to_string(),
184 ..Default::default()
185 }),
186 allow_privilege_escalation: Some(false),
187 ..Default::default()
188 })
189 } else {
190 None
191 };
192
193 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
194 Some(btreemap! {
195 "prometheus.io/scrape".to_owned() => "true".to_string(),
196 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
197 "prometheus.io/path".to_owned() => "/metrics".to_string(),
198 "prometheus.io/scheme".to_owned() => "http".to_string(),
199 })
200 } else {
201 None
202 };
203 let mut pod_template_labels = balancer.default_labels();
204 pod_template_labels.insert(
205 "materialize.cloud/name".to_owned(),
206 balancer.deployment_name(),
207 );
208 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
209 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
210
211 let ports = vec![
212 ContainerPort {
213 container_port: self.config.balancerd_sql_port.into(),
214 name: Some("pgwire".into()),
215 protocol: Some("TCP".into()),
216 ..Default::default()
217 },
218 ContainerPort {
219 container_port: self.config.balancerd_http_port.into(),
220 name: Some("http".into()),
221 protocol: Some("TCP".into()),
222 ..Default::default()
223 },
224 ContainerPort {
225 container_port: self.config.balancerd_internal_http_port.into(),
226 name: Some("internal-http".into()),
227 protocol: Some("TCP".into()),
228 ..Default::default()
229 },
230 ];
231
232 let mut args = vec![
233 "service".to_string(),
234 format!(
235 "--pgwire-listen-addr=0.0.0.0:{}",
236 self.config.balancerd_sql_port
237 ),
238 format!(
239 "--https-listen-addr=0.0.0.0:{}",
240 self.config.balancerd_http_port
241 ),
242 format!(
243 "--internal-http-listen-addr=0.0.0.0:{}",
244 self.config.balancerd_internal_http_port
245 ),
246 ];
247 match balancer.routing()? {
248 Routing::Static(static_routing_config) => {
249 args.extend([
250 format!(
251 "--https-resolver-template={}.{}.svc.cluster.local:{}",
252 static_routing_config.environmentd_service_name,
253 static_routing_config.environmentd_namespace,
254 self.config.environmentd_http_port
255 ),
256 format!(
257 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
258 static_routing_config.environmentd_service_name,
259 static_routing_config.environmentd_namespace,
260 self.config.environmentd_sql_port
261 ),
262 ]);
263 }
264 Routing::Frontegg(_frontegg_routing_config) => {
265 bail!("frontegg routing is not yet implemented");
266 }
267 }
268
269 if issuer_ref_defined(
270 &self.config.default_certificate_specs.internal,
271 &balancer.spec.internal_certificate_spec,
272 ) {
273 args.push("--internal-tls".to_owned())
274 }
275
276 let mut volumes = Vec::new();
277 let mut volume_mounts = Vec::new();
278 if issuer_ref_defined(
279 &self.config.default_certificate_specs.balancerd_external,
280 &balancer.spec.external_certificate_spec,
281 ) {
282 volumes.push(Volume {
283 name: "external-certificate".to_owned(),
284 secret: Some(SecretVolumeSource {
285 default_mode: Some(0o400),
286 secret_name: Some(balancer.external_certificate_secret_name()),
287 items: None,
288 optional: Some(false),
289 }),
290 ..Default::default()
291 });
292 volume_mounts.push(VolumeMount {
293 name: "external-certificate".to_owned(),
294 mount_path: "/etc/external_tls".to_owned(),
295 read_only: Some(true),
296 ..Default::default()
297 });
298 args.extend([
299 "--tls-mode=require".into(),
300 "--tls-cert=/etc/external_tls/tls.crt".into(),
301 "--tls-key=/etc/external_tls/tls.key".into(),
302 ]);
303 } else {
304 args.push("--tls-mode=disable".to_string());
305 }
306
307 let startup_probe = Probe {
308 http_get: Some(HTTPGetAction {
309 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
310 path: Some("/api/readyz".into()),
311 ..Default::default()
312 }),
313 failure_threshold: Some(20),
314 initial_delay_seconds: Some(3),
315 period_seconds: Some(3),
316 success_threshold: Some(1),
317 timeout_seconds: Some(1),
318 ..Default::default()
319 };
320 let readiness_probe = Probe {
321 http_get: Some(HTTPGetAction {
322 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
323 path: Some("/api/readyz".into()),
324 ..Default::default()
325 }),
326 failure_threshold: Some(3),
327 period_seconds: Some(10),
328 success_threshold: Some(1),
329 timeout_seconds: Some(1),
330 ..Default::default()
331 };
332 let liveness_probe = Probe {
333 http_get: Some(HTTPGetAction {
334 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
335 path: Some("/api/livez".into()),
336 ..Default::default()
337 }),
338 failure_threshold: Some(3),
339 initial_delay_seconds: Some(8),
340 period_seconds: Some(10),
341 success_threshold: Some(1),
342 timeout_seconds: Some(1),
343 ..Default::default()
344 };
345
346 let container = Container {
347 name: "balancerd".to_owned(),
348 image: Some(balancer.spec.balancerd_image_ref.clone()),
349 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
350 ports: Some(ports),
351 args: Some(args),
352 startup_probe: Some(startup_probe),
353 readiness_probe: Some(readiness_probe),
354 liveness_probe: Some(liveness_probe),
355 resources: balancer
356 .spec
357 .resource_requirements
358 .clone()
359 .or_else(|| self.config.balancerd_default_resources.clone()),
360 security_context: security_context.clone(),
361 volume_mounts: Some(volume_mounts),
362 ..Default::default()
363 };
364
365 let deployment_spec = DeploymentSpec {
366 replicas: Some(balancer.replicas()),
367 selector: LabelSelector {
368 match_labels: Some(pod_template_labels.clone()),
369 ..Default::default()
370 },
371 strategy: Some(DeploymentStrategy {
372 rolling_update: Some(RollingUpdateDeployment {
373 max_surge: Some(IntOrString::String("100%".into())),
377 ..Default::default()
378 }),
379 ..Default::default()
380 }),
381 template: PodTemplateSpec {
382 metadata: Some(ObjectMeta {
385 annotations: pod_template_annotations,
386 labels: Some(pod_template_labels),
387 ..Default::default()
388 }),
389 spec: Some(PodSpec {
390 containers: vec![container],
391 node_selector: Some(
392 self.config
393 .balancerd_node_selector
394 .iter()
395 .map(|selector| (selector.key.clone(), selector.value.clone()))
396 .collect(),
397 ),
398 affinity: self.config.balancerd_affinity.clone(),
399 tolerations: self.config.balancerd_tolerations.clone(),
400 security_context: Some(PodSecurityContext {
401 fs_group: Some(999),
402 run_as_user: Some(999),
403 run_as_group: Some(999),
404 ..Default::default()
405 }),
406 scheduler_name: self.config.scheduler_name.clone(),
407 volumes: Some(volumes),
408 ..Default::default()
409 }),
410 },
411 ..Default::default()
412 };
413
414 Ok(Deployment {
415 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
416 spec: Some(deployment_spec),
417 status: None,
418 })
419 }
420
421 fn create_service_object(&self, balancer: &Balancer) -> Service {
422 let selector =
423 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
424
425 let ports = vec![
426 ServicePort {
427 name: Some("http".to_string()),
428 protocol: Some("TCP".to_string()),
429 port: self.config.balancerd_http_port.into(),
430 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
431 ..Default::default()
432 },
433 ServicePort {
434 name: Some("pgwire".to_string()),
435 protocol: Some("TCP".to_string()),
436 port: self.config.balancerd_sql_port.into(),
437 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
438 ..Default::default()
439 },
440 ];
441
442 let spec = ServiceSpec {
443 type_: Some("ClusterIP".to_string()),
444 cluster_ip: Some("None".to_string()),
445 selector: Some(selector),
446 ports: Some(ports),
447 ..Default::default()
448 };
449
450 Service {
451 metadata: balancer.managed_resource_meta(balancer.service_name()),
452 spec: Some(spec),
453 status: None,
454 }
455 }
456
457 async fn fix_deployment(
460 &self,
461 deployment_api: &Api<Deployment>,
462 new_deployment: &Deployment,
463 ) -> Result<(), Error> {
464 let Some(mut existing_deployment) = self
465 .deployments
466 .get(
467 &ObjectRef::new(&new_deployment.name_unchecked())
468 .within(&new_deployment.namespace().unwrap()),
469 )
470 .map(Arc::unwrap_or_clone)
471 else {
472 return Ok(());
473 };
474
475 if existing_deployment.spec.as_ref().unwrap().selector
476 == new_deployment.spec.as_ref().unwrap().selector
477 {
478 return Ok(());
479 }
480
481 warn!("found existing deployment with old label selector, fixing");
482
483 existing_deployment
486 .spec
487 .as_mut()
488 .unwrap()
489 .template
490 .metadata
491 .as_mut()
492 .unwrap()
493 .labels = new_deployment
494 .spec
495 .as_ref()
496 .unwrap()
497 .template
498 .metadata
499 .as_ref()
500 .unwrap()
501 .labels
502 .clone();
503
504 replace_resource(deployment_api, &existing_deployment).await?;
508 await_condition(
509 deployment_api.clone(),
510 &existing_deployment.name_unchecked(),
511 |deployment: Option<&Deployment>| {
512 let observed_generation = deployment
513 .and_then(|deployment| deployment.status.as_ref())
514 .and_then(|status| status.observed_generation)
515 .unwrap_or(0);
516 let current_generation = deployment
517 .and_then(|deployment| deployment.meta().generation)
518 .unwrap_or(0);
519 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
520 observed_generation == current_generation
521 && current_generation > previous_generation
522 },
523 )
524 .await
525 .map_err(|e| anyhow::anyhow!(e))?;
526 await_condition(
527 deployment_api.clone(),
528 &existing_deployment.name_unchecked(),
529 is_deployment_completed(),
530 )
531 .await
532 .map_err(|e| anyhow::anyhow!(e))?;
533
534 match kube::runtime::wait::delete::delete_and_finalize(
537 deployment_api.clone(),
538 &existing_deployment.name_unchecked(),
539 &DeleteParams::orphan(),
540 )
541 .await
542 {
543 Ok(_) => {}
544 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
545 if e.code == 404 =>
546 {
547 }
549 Err(e) => return Err(anyhow::anyhow!(e).into()),
550 }
551
552 Ok(())
558 }
559}
560
561#[async_trait::async_trait]
562impl k8s_controller::Context for Context {
563 type Resource = Balancer;
564 type Error = Error;
565
566 #[instrument(fields())]
567 async fn apply(
568 &self,
569 client: Client,
570 balancer: &Self::Resource,
571 ) -> Result<Option<Action>, Self::Error> {
572 if balancer.status.is_none() {
573 let balancer_api: Api<Balancer> =
574 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
575 let mut new_balancer = balancer.clone();
576 new_balancer.status = Some(balancer.status());
577 balancer_api
578 .replace_status(
579 &balancer.name_unchecked(),
580 &PostParams::default(),
581 &new_balancer,
582 )
583 .await?;
584 return Ok(None);
587 }
588
589 let namespace = balancer.namespace();
590 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
591 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
592 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
593
594 if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
595 trace!("creating new balancerd external certificate");
596 apply_resource(&certificate_api, &external_certificate).await?;
597 }
598
599 let deployment = self.create_deployment_object(balancer)?;
600 self.fix_deployment(&deployment_api, &deployment).await?;
601 trace!("creating new balancerd deployment");
602 apply_resource(&deployment_api, &deployment).await?;
603
604 let service = self.create_service_object(balancer);
605 trace!("creating new balancerd service");
606 apply_resource(&service_api, &service).await?;
607
608 self.sync_deployment_status(&client, balancer).await?;
609
610 Ok(None)
611 }
612}