1use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14 api::{
15 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16 core::v1::{
17 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20 Volume, VolumeMount,
21 },
22 },
23 apimachinery::pkg::{
24 apis::meta::v1::{Condition, LabelSelector, Time},
25 util::intstr::IntOrString,
26 },
27 jiff::Timestamp,
28};
29use kube::{
30 Api, Client, Resource, ResourceExt,
31 api::{DeleteParams, ObjectMeta, PostParams},
32 runtime::{
33 conditions::is_deployment_completed,
34 controller::Action,
35 reflector::{ObjectRef, Store},
36 wait::await_condition,
37 },
38};
39use maplit::btreemap;
40use tracing::{trace, warn};
41
42use crate::{
43 Error,
44 k8s::{apply_resource, make_reflector, replace_resource},
45 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
46};
47use mz_cloud_resources::crd::{
48 ManagedResource,
49 balancer::v1alpha1::{Balancer, Routing},
50 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
51};
52use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
53use mz_ore::{cli::KeyValueArg, instrument};
54
55pub struct Config {
56 pub enable_security_context: bool,
57 pub enable_prometheus_scrape_annotations: bool,
58
59 pub image_pull_policy: KubernetesImagePullPolicy,
60 pub scheduler_name: Option<String>,
61 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
62 pub balancerd_affinity: Option<Affinity>,
63 pub balancerd_tolerations: Option<Vec<Toleration>>,
64 pub balancerd_default_resources: Option<ResourceRequirements>,
65
66 pub default_certificate_specs: DefaultCertificateSpecs,
67
68 pub environmentd_sql_port: u16,
69 pub environmentd_http_port: u16,
70 pub balancerd_sql_port: u16,
71 pub balancerd_http_port: u16,
72 pub balancerd_internal_http_port: u16,
73}
74
75pub struct Context {
76 config: Config,
77 deployments: Store<Deployment>,
78}
79
80impl Context {
81 pub async fn new(config: Config, client: Client) -> Self {
82 Self {
83 config,
84 deployments: make_reflector(client).await,
85 }
86 }
87
88 async fn sync_deployment_status(
89 &self,
90 client: &Client,
91 balancer: &Balancer,
92 ) -> Result<(), kube::Error> {
93 let namespace = balancer.namespace();
94 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
95
96 let Some(deployment) = self
97 .deployments
98 .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
99 else {
100 return Ok(());
101 };
102
103 let Some(deployment_conditions) = &deployment
104 .status
105 .as_ref()
106 .and_then(|status| status.conditions.as_ref())
107 else {
108 return Ok(());
111 };
112
113 let ready = deployment_conditions
114 .iter()
115 .any(|condition| condition.type_ == "Available" && condition.status == "True");
116 let ready_str = if ready { "True" } else { "False" };
117
118 let mut status = balancer.status.clone().unwrap();
119 if status
120 .conditions
121 .iter()
122 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
123 {
124 return Ok(());
128 }
129
130 status.conditions = vec![Condition {
131 type_: "Ready".to_string(),
132 status: ready_str.to_string(),
133 last_transition_time: Time(Timestamp::now()),
134 message: format!(
135 "balancerd deployment is{} ready",
136 if ready { "" } else { " not" }
137 ),
138 observed_generation: None,
139 reason: "DeploymentStatus".to_string(),
140 }];
141 let mut new_balancer = balancer.clone();
142 new_balancer.status = Some(status);
143
144 balancer_api
145 .replace_status(
146 &balancer.name_unchecked(),
147 &PostParams::default(),
148 &new_balancer,
149 )
150 .await?;
151
152 Ok(())
153 }
154
155 fn create_external_certificate_object(
156 &self,
157 balancer: &Balancer,
158 ) -> Result<Option<Certificate>, anyhow::Error> {
159 create_certificate(
160 self.config
161 .default_certificate_specs
162 .balancerd_external
163 .clone(),
164 balancer,
165 balancer.spec.external_certificate_spec.clone(),
166 balancer.external_certificate_name(),
167 balancer.external_certificate_secret_name(),
168 None,
169 CertificatePrivateKeyAlgorithm::Ecdsa,
170 Some(256),
171 )
172 }
173
174 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
175 let security_context = if self.config.enable_security_context {
176 Some(SecurityContext {
180 run_as_non_root: Some(true),
181 capabilities: Some(Capabilities {
182 drop: Some(vec!["ALL".to_string()]),
183 ..Default::default()
184 }),
185 seccomp_profile: Some(SeccompProfile {
186 type_: "RuntimeDefault".to_string(),
187 ..Default::default()
188 }),
189 allow_privilege_escalation: Some(false),
190 ..Default::default()
191 })
192 } else {
193 None
194 };
195
196 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
197 Some(btreemap! {
198 "prometheus.io/scrape".to_owned() => "true".to_string(),
199 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
200 "prometheus.io/path".to_owned() => "/metrics".to_string(),
201 "prometheus.io/scheme".to_owned() => "http".to_string(),
202 })
203 } else {
204 None
205 };
206 let mut pod_template_labels = balancer.default_labels();
207 pod_template_labels.insert(
208 "materialize.cloud/name".to_owned(),
209 balancer.deployment_name(),
210 );
211 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
212 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
213
214 let ports = vec![
215 ContainerPort {
216 container_port: self.config.balancerd_sql_port.into(),
217 name: Some("pgwire".into()),
218 protocol: Some("TCP".into()),
219 ..Default::default()
220 },
221 ContainerPort {
222 container_port: self.config.balancerd_http_port.into(),
223 name: Some("http".into()),
224 protocol: Some("TCP".into()),
225 ..Default::default()
226 },
227 ContainerPort {
228 container_port: self.config.balancerd_internal_http_port.into(),
229 name: Some("internal-http".into()),
230 protocol: Some("TCP".into()),
231 ..Default::default()
232 },
233 ];
234
235 let mut args = vec![
236 "service".to_string(),
237 format!(
238 "--pgwire-listen-addr=0.0.0.0:{}",
239 self.config.balancerd_sql_port
240 ),
241 format!(
242 "--https-listen-addr=0.0.0.0:{}",
243 self.config.balancerd_http_port
244 ),
245 format!(
246 "--internal-http-listen-addr=0.0.0.0:{}",
247 self.config.balancerd_internal_http_port
248 ),
249 ];
250 match balancer.routing()? {
251 Routing::Static(static_routing_config) => {
252 args.extend([
253 format!(
254 "--https-resolver-template={}.{}.svc.cluster.local:{}",
255 static_routing_config.environmentd_service_name,
256 static_routing_config.environmentd_namespace,
257 self.config.environmentd_http_port
258 ),
259 format!(
260 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
261 static_routing_config.environmentd_service_name,
262 static_routing_config.environmentd_namespace,
263 self.config.environmentd_sql_port
264 ),
265 ]);
266 }
267 Routing::Frontegg(_frontegg_routing_config) => {
268 bail!("frontegg routing is not yet implemented");
269 }
270 }
271
272 if issuer_ref_defined(
273 &self.config.default_certificate_specs.internal,
274 &balancer.spec.internal_certificate_spec,
275 ) {
276 args.push("--internal-tls".to_owned())
277 }
278
279 let mut volumes = Vec::new();
280 let mut volume_mounts = Vec::new();
281 if issuer_ref_defined(
282 &self.config.default_certificate_specs.balancerd_external,
283 &balancer.spec.external_certificate_spec,
284 ) {
285 volumes.push(Volume {
286 name: "external-certificate".to_owned(),
287 secret: Some(SecretVolumeSource {
288 default_mode: Some(0o400),
289 secret_name: Some(balancer.external_certificate_secret_name()),
290 items: None,
291 optional: Some(false),
292 }),
293 ..Default::default()
294 });
295 volume_mounts.push(VolumeMount {
296 name: "external-certificate".to_owned(),
297 mount_path: "/etc/external_tls".to_owned(),
298 read_only: Some(true),
299 ..Default::default()
300 });
301 args.extend([
302 "--tls-mode=require".into(),
303 "--tls-cert=/etc/external_tls/tls.crt".into(),
304 "--tls-key=/etc/external_tls/tls.key".into(),
305 ]);
306 } else {
307 args.push("--tls-mode=disable".to_string());
308 }
309
310 let startup_probe = Probe {
311 http_get: Some(HTTPGetAction {
312 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
313 path: Some("/api/readyz".into()),
314 ..Default::default()
315 }),
316 failure_threshold: Some(20),
317 initial_delay_seconds: Some(3),
318 period_seconds: Some(3),
319 success_threshold: Some(1),
320 timeout_seconds: Some(1),
321 ..Default::default()
322 };
323 let readiness_probe = Probe {
324 http_get: Some(HTTPGetAction {
325 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
326 path: Some("/api/readyz".into()),
327 ..Default::default()
328 }),
329 failure_threshold: Some(3),
330 period_seconds: Some(10),
331 success_threshold: Some(1),
332 timeout_seconds: Some(1),
333 ..Default::default()
334 };
335 let liveness_probe = Probe {
336 http_get: Some(HTTPGetAction {
337 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
338 path: Some("/api/livez".into()),
339 ..Default::default()
340 }),
341 failure_threshold: Some(3),
342 initial_delay_seconds: Some(8),
343 period_seconds: Some(10),
344 success_threshold: Some(1),
345 timeout_seconds: Some(1),
346 ..Default::default()
347 };
348
349 let container = Container {
350 name: "balancerd".to_owned(),
351 image: Some(balancer.spec.balancerd_image_ref.clone()),
352 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
353 ports: Some(ports),
354 args: Some(args),
355 startup_probe: Some(startup_probe),
356 readiness_probe: Some(readiness_probe),
357 liveness_probe: Some(liveness_probe),
358 resources: balancer
359 .spec
360 .resource_requirements
361 .clone()
362 .or_else(|| self.config.balancerd_default_resources.clone()),
363 security_context: security_context.clone(),
364 volume_mounts: Some(volume_mounts),
365 ..Default::default()
366 };
367
368 let deployment_spec = DeploymentSpec {
369 replicas: Some(balancer.replicas()),
370 selector: LabelSelector {
371 match_labels: Some(pod_template_labels.clone()),
372 ..Default::default()
373 },
374 strategy: Some(DeploymentStrategy {
375 rolling_update: Some(RollingUpdateDeployment {
376 max_surge: Some(IntOrString::String("100%".into())),
380 ..Default::default()
381 }),
382 ..Default::default()
383 }),
384 template: PodTemplateSpec {
385 metadata: Some(ObjectMeta {
388 annotations: pod_template_annotations,
389 labels: Some(pod_template_labels),
390 ..Default::default()
391 }),
392 spec: Some(PodSpec {
393 containers: vec![container],
394 node_selector: Some(
395 self.config
396 .balancerd_node_selector
397 .iter()
398 .map(|selector| (selector.key.clone(), selector.value.clone()))
399 .collect(),
400 ),
401 affinity: self.config.balancerd_affinity.clone(),
402 tolerations: self.config.balancerd_tolerations.clone(),
403 security_context: Some(PodSecurityContext {
404 fs_group: Some(999),
405 run_as_user: Some(999),
406 run_as_group: Some(999),
407 ..Default::default()
408 }),
409 scheduler_name: self.config.scheduler_name.clone(),
410 volumes: Some(volumes),
411 ..Default::default()
412 }),
413 },
414 ..Default::default()
415 };
416
417 Ok(Deployment {
418 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
419 spec: Some(deployment_spec),
420 status: None,
421 })
422 }
423
424 fn create_service_object(&self, balancer: &Balancer) -> Service {
425 let selector =
426 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
427
428 let ports = vec![
429 ServicePort {
430 name: Some("http".to_string()),
431 protocol: Some("TCP".to_string()),
432 port: self.config.balancerd_http_port.into(),
433 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
434 ..Default::default()
435 },
436 ServicePort {
437 name: Some("pgwire".to_string()),
438 protocol: Some("TCP".to_string()),
439 port: self.config.balancerd_sql_port.into(),
440 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
441 ..Default::default()
442 },
443 ];
444
445 let spec = ServiceSpec {
446 type_: Some("ClusterIP".to_string()),
447 cluster_ip: Some("None".to_string()),
448 selector: Some(selector),
449 ports: Some(ports),
450 ..Default::default()
451 };
452
453 Service {
454 metadata: balancer.managed_resource_meta(balancer.service_name()),
455 spec: Some(spec),
456 status: None,
457 }
458 }
459
460 async fn fix_deployment(
463 &self,
464 deployment_api: &Api<Deployment>,
465 new_deployment: &Deployment,
466 ) -> Result<(), Error> {
467 let Some(mut existing_deployment) = self
468 .deployments
469 .get(
470 &ObjectRef::new(&new_deployment.name_unchecked())
471 .within(&new_deployment.namespace().unwrap()),
472 )
473 .map(Arc::unwrap_or_clone)
474 else {
475 return Ok(());
476 };
477
478 if existing_deployment.spec.as_ref().unwrap().selector
479 == new_deployment.spec.as_ref().unwrap().selector
480 {
481 return Ok(());
482 }
483
484 warn!("found existing deployment with old label selector, fixing");
485
486 existing_deployment
489 .spec
490 .as_mut()
491 .unwrap()
492 .template
493 .metadata
494 .as_mut()
495 .unwrap()
496 .labels = new_deployment
497 .spec
498 .as_ref()
499 .unwrap()
500 .template
501 .metadata
502 .as_ref()
503 .unwrap()
504 .labels
505 .clone();
506
507 replace_resource(deployment_api, &existing_deployment).await?;
511 await_condition(
512 deployment_api.clone(),
513 &existing_deployment.name_unchecked(),
514 |deployment: Option<&Deployment>| {
515 let observed_generation = deployment
516 .and_then(|deployment| deployment.status.as_ref())
517 .and_then(|status| status.observed_generation)
518 .unwrap_or(0);
519 let current_generation = deployment
520 .and_then(|deployment| deployment.meta().generation)
521 .unwrap_or(0);
522 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
523 observed_generation == current_generation
524 && current_generation > previous_generation
525 },
526 )
527 .await
528 .map_err(|e| anyhow::anyhow!(e))?;
529 await_condition(
530 deployment_api.clone(),
531 &existing_deployment.name_unchecked(),
532 is_deployment_completed(),
533 )
534 .await
535 .map_err(|e| anyhow::anyhow!(e))?;
536
537 match kube::runtime::wait::delete::delete_and_finalize(
540 deployment_api.clone(),
541 &existing_deployment.name_unchecked(),
542 &DeleteParams::orphan(),
543 )
544 .await
545 {
546 Ok(_) => {}
547 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
548 if e.code == 404 =>
549 {
550 }
552 Err(e) => return Err(anyhow::anyhow!(e).into()),
553 }
554
555 Ok(())
561 }
562}
563
564#[async_trait::async_trait]
565impl k8s_controller::Context for Context {
566 type Resource = Balancer;
567 type Error = Error;
568
569 #[instrument(fields())]
570 async fn apply(
571 &self,
572 client: Client,
573 balancer: &Self::Resource,
574 ) -> Result<Option<Action>, Self::Error> {
575 if balancer.status.is_none() {
576 let balancer_api: Api<Balancer> =
577 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
578 let mut new_balancer = balancer.clone();
579 new_balancer.status = Some(balancer.status());
580 balancer_api
581 .replace_status(
582 &balancer.name_unchecked(),
583 &PostParams::default(),
584 &new_balancer,
585 )
586 .await?;
587 return Ok(None);
590 }
591
592 let namespace = balancer.namespace();
593 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
594 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
595 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
596
597 if let Some(external_certificate) = self.create_external_certificate_object(balancer)? {
598 trace!("creating new balancerd external certificate");
599 apply_resource(&certificate_api, &external_certificate).await?;
600 }
601
602 let deployment = self.create_deployment_object(balancer)?;
603 self.fix_deployment(&deployment_api, &deployment).await?;
604 trace!("creating new balancerd deployment");
605 apply_resource(&deployment_api, &deployment).await?;
606
607 let service = self.create_service_object(balancer);
608 trace!("creating new balancerd service");
609 apply_resource(&service_api, &service).await?;
610
611 self.sync_deployment_status(&client, balancer).await?;
612
613 Ok(None)
614 }
615}