1use std::sync::Arc;
11
12use anyhow::bail;
13use k8s_openapi::{
14 api::{
15 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
16 core::v1::{
17 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
18 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
19 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
20 Volume, VolumeMount,
21 },
22 },
23 apimachinery::pkg::{
24 apis::meta::v1::{Condition, LabelSelector, Time},
25 util::intstr::IntOrString,
26 },
27};
28use kube::{
29 Api, Client, Resource, ResourceExt,
30 api::{DeleteParams, ObjectMeta, PostParams},
31 runtime::{
32 conditions::is_deployment_completed,
33 controller::Action,
34 reflector::{ObjectRef, Store},
35 wait::await_condition,
36 },
37};
38use maplit::btreemap;
39use tracing::{trace, warn};
40
41use crate::{
42 Error,
43 k8s::{apply_resource, make_reflector, replace_resource},
44 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
45};
46use mz_cloud_resources::crd::{
47 ManagedResource,
48 balancer::v1alpha1::{Balancer, Routing},
49 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
50};
51use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
52use mz_ore::{cli::KeyValueArg, instrument};
53
54pub struct Config {
55 pub enable_security_context: bool,
56 pub enable_prometheus_scrape_annotations: bool,
57
58 pub image_pull_policy: KubernetesImagePullPolicy,
59 pub scheduler_name: Option<String>,
60 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
61 pub balancerd_affinity: Option<Affinity>,
62 pub balancerd_tolerations: Option<Vec<Toleration>>,
63 pub balancerd_default_resources: Option<ResourceRequirements>,
64
65 pub default_certificate_specs: DefaultCertificateSpecs,
66
67 pub environmentd_sql_port: u16,
68 pub environmentd_http_port: u16,
69 pub balancerd_sql_port: u16,
70 pub balancerd_http_port: u16,
71 pub balancerd_internal_http_port: u16,
72}
73
74pub struct Context {
75 config: Config,
76 deployments: Store<Deployment>,
77}
78
79impl Context {
80 pub async fn new(config: Config, client: Client) -> Self {
81 Self {
82 config,
83 deployments: make_reflector(client).await,
84 }
85 }
86
87 async fn sync_deployment_status(
88 &self,
89 client: &Client,
90 balancer: &Balancer,
91 ) -> Result<(), kube::Error> {
92 let namespace = balancer.namespace();
93 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
94
95 let Some(deployment) = self
96 .deployments
97 .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
98 else {
99 return Ok(());
100 };
101
102 let Some(deployment_conditions) = &deployment
103 .status
104 .as_ref()
105 .and_then(|status| status.conditions.as_ref())
106 else {
107 return Ok(());
110 };
111
112 let ready = deployment_conditions
113 .iter()
114 .any(|condition| condition.type_ == "Available" && condition.status == "True");
115 let ready_str = if ready { "True" } else { "False" };
116
117 let mut status = balancer.status.clone().unwrap();
118 if status
119 .conditions
120 .iter()
121 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
122 {
123 return Ok(());
127 }
128
129 status.conditions = vec![Condition {
130 type_: "Ready".to_string(),
131 status: ready_str.to_string(),
132 last_transition_time: Time(chrono::offset::Utc::now()),
133 message: format!(
134 "balancerd deployment is{} ready",
135 if ready { "" } else { " not" }
136 ),
137 observed_generation: None,
138 reason: "DeploymentStatus".to_string(),
139 }];
140 let mut new_balancer = balancer.clone();
141 new_balancer.status = Some(status);
142
143 balancer_api
144 .replace_status(
145 &balancer.name_unchecked(),
146 &PostParams::default(),
147 serde_json::to_vec(&new_balancer).unwrap(),
148 )
149 .await?;
150
151 Ok(())
152 }
153
154 fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
155 create_certificate(
156 self.config
157 .default_certificate_specs
158 .balancerd_external
159 .clone(),
160 balancer,
161 balancer.spec.external_certificate_spec.clone(),
162 balancer.external_certificate_name(),
163 balancer.external_certificate_secret_name(),
164 None,
165 CertificatePrivateKeyAlgorithm::Rsa,
166 Some(4096),
167 )
168 }
169
170 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
171 let security_context = if self.config.enable_security_context {
172 Some(SecurityContext {
176 run_as_non_root: Some(true),
177 capabilities: Some(Capabilities {
178 drop: Some(vec!["ALL".to_string()]),
179 ..Default::default()
180 }),
181 seccomp_profile: Some(SeccompProfile {
182 type_: "RuntimeDefault".to_string(),
183 ..Default::default()
184 }),
185 allow_privilege_escalation: Some(false),
186 ..Default::default()
187 })
188 } else {
189 None
190 };
191
192 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
193 Some(btreemap! {
194 "prometheus.io/scrape".to_owned() => "true".to_string(),
195 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
196 "prometheus.io/path".to_owned() => "/metrics".to_string(),
197 "prometheus.io/scheme".to_owned() => "http".to_string(),
198 })
199 } else {
200 None
201 };
202 let mut pod_template_labels = balancer.default_labels();
203 pod_template_labels.insert(
204 "materialize.cloud/name".to_owned(),
205 balancer.deployment_name(),
206 );
207 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
208 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
209
210 let ports = vec![
211 ContainerPort {
212 container_port: self.config.balancerd_sql_port.into(),
213 name: Some("pgwire".into()),
214 protocol: Some("TCP".into()),
215 ..Default::default()
216 },
217 ContainerPort {
218 container_port: self.config.balancerd_http_port.into(),
219 name: Some("http".into()),
220 protocol: Some("TCP".into()),
221 ..Default::default()
222 },
223 ContainerPort {
224 container_port: self.config.balancerd_internal_http_port.into(),
225 name: Some("internal-http".into()),
226 protocol: Some("TCP".into()),
227 ..Default::default()
228 },
229 ];
230
231 let mut args = vec![
232 "service".to_string(),
233 format!(
234 "--pgwire-listen-addr=0.0.0.0:{}",
235 self.config.balancerd_sql_port
236 ),
237 format!(
238 "--https-listen-addr=0.0.0.0:{}",
239 self.config.balancerd_http_port
240 ),
241 format!(
242 "--internal-http-listen-addr=0.0.0.0:{}",
243 self.config.balancerd_internal_http_port
244 ),
245 ];
246 match balancer.routing()? {
247 Routing::Static(static_routing_config) => {
248 args.extend([
249 format!(
250 "--https-resolver-template={}.{}.svc.cluster.local:{}",
251 static_routing_config.environmentd_service_name,
252 static_routing_config.environmentd_namespace,
253 self.config.environmentd_http_port
254 ),
255 format!(
256 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
257 static_routing_config.environmentd_service_name,
258 static_routing_config.environmentd_namespace,
259 self.config.environmentd_sql_port
260 ),
261 ]);
262 }
263 Routing::Frontegg(_frontegg_routing_config) => {
264 bail!("frontegg routing is not yet implemented");
265 }
266 }
267
268 if issuer_ref_defined(
269 &self.config.default_certificate_specs.internal,
270 &balancer.spec.internal_certificate_spec,
271 ) {
272 args.push("--internal-tls".to_owned())
273 }
274
275 let mut volumes = Vec::new();
276 let mut volume_mounts = Vec::new();
277 if issuer_ref_defined(
278 &self.config.default_certificate_specs.balancerd_external,
279 &balancer.spec.external_certificate_spec,
280 ) {
281 volumes.push(Volume {
282 name: "external-certificate".to_owned(),
283 secret: Some(SecretVolumeSource {
284 default_mode: Some(0o400),
285 secret_name: Some(balancer.external_certificate_secret_name()),
286 items: None,
287 optional: Some(false),
288 }),
289 ..Default::default()
290 });
291 volume_mounts.push(VolumeMount {
292 name: "external-certificate".to_owned(),
293 mount_path: "/etc/external_tls".to_owned(),
294 read_only: Some(true),
295 ..Default::default()
296 });
297 args.extend([
298 "--tls-mode=require".into(),
299 "--tls-cert=/etc/external_tls/tls.crt".into(),
300 "--tls-key=/etc/external_tls/tls.key".into(),
301 ]);
302 } else {
303 args.push("--tls-mode=disable".to_string());
304 }
305
306 let startup_probe = Probe {
307 http_get: Some(HTTPGetAction {
308 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
309 path: Some("/api/readyz".into()),
310 ..Default::default()
311 }),
312 failure_threshold: Some(20),
313 initial_delay_seconds: Some(3),
314 period_seconds: Some(3),
315 success_threshold: Some(1),
316 timeout_seconds: Some(1),
317 ..Default::default()
318 };
319 let readiness_probe = Probe {
320 http_get: Some(HTTPGetAction {
321 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
322 path: Some("/api/readyz".into()),
323 ..Default::default()
324 }),
325 failure_threshold: Some(3),
326 period_seconds: Some(10),
327 success_threshold: Some(1),
328 timeout_seconds: Some(1),
329 ..Default::default()
330 };
331 let liveness_probe = Probe {
332 http_get: Some(HTTPGetAction {
333 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
334 path: Some("/api/livez".into()),
335 ..Default::default()
336 }),
337 failure_threshold: Some(3),
338 initial_delay_seconds: Some(8),
339 period_seconds: Some(10),
340 success_threshold: Some(1),
341 timeout_seconds: Some(1),
342 ..Default::default()
343 };
344
345 let container = Container {
346 name: "balancerd".to_owned(),
347 image: Some(balancer.spec.balancerd_image_ref.clone()),
348 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
349 ports: Some(ports),
350 args: Some(args),
351 startup_probe: Some(startup_probe),
352 readiness_probe: Some(readiness_probe),
353 liveness_probe: Some(liveness_probe),
354 resources: balancer
355 .spec
356 .resource_requirements
357 .clone()
358 .or_else(|| self.config.balancerd_default_resources.clone()),
359 security_context: security_context.clone(),
360 volume_mounts: Some(volume_mounts),
361 ..Default::default()
362 };
363
364 let deployment_spec = DeploymentSpec {
365 replicas: Some(balancer.replicas()),
366 selector: LabelSelector {
367 match_labels: Some(pod_template_labels.clone()),
368 ..Default::default()
369 },
370 strategy: Some(DeploymentStrategy {
371 rolling_update: Some(RollingUpdateDeployment {
372 max_surge: Some(IntOrString::String("100%".into())),
376 ..Default::default()
377 }),
378 ..Default::default()
379 }),
380 template: PodTemplateSpec {
381 metadata: Some(ObjectMeta {
384 annotations: pod_template_annotations,
385 labels: Some(pod_template_labels),
386 ..Default::default()
387 }),
388 spec: Some(PodSpec {
389 containers: vec![container],
390 node_selector: Some(
391 self.config
392 .balancerd_node_selector
393 .iter()
394 .map(|selector| (selector.key.clone(), selector.value.clone()))
395 .collect(),
396 ),
397 affinity: self.config.balancerd_affinity.clone(),
398 tolerations: self.config.balancerd_tolerations.clone(),
399 security_context: Some(PodSecurityContext {
400 fs_group: Some(999),
401 run_as_user: Some(999),
402 run_as_group: Some(999),
403 ..Default::default()
404 }),
405 scheduler_name: self.config.scheduler_name.clone(),
406 volumes: Some(volumes),
407 ..Default::default()
408 }),
409 },
410 ..Default::default()
411 };
412
413 Ok(Deployment {
414 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
415 spec: Some(deployment_spec),
416 status: None,
417 })
418 }
419
420 fn create_service_object(&self, balancer: &Balancer) -> Service {
421 let selector =
422 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
423
424 let ports = vec![
425 ServicePort {
426 name: Some("http".to_string()),
427 protocol: Some("TCP".to_string()),
428 port: self.config.balancerd_http_port.into(),
429 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
430 ..Default::default()
431 },
432 ServicePort {
433 name: Some("pgwire".to_string()),
434 protocol: Some("TCP".to_string()),
435 port: self.config.balancerd_sql_port.into(),
436 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
437 ..Default::default()
438 },
439 ];
440
441 let spec = ServiceSpec {
442 type_: Some("ClusterIP".to_string()),
443 cluster_ip: Some("None".to_string()),
444 selector: Some(selector),
445 ports: Some(ports),
446 ..Default::default()
447 };
448
449 Service {
450 metadata: balancer.managed_resource_meta(balancer.service_name()),
451 spec: Some(spec),
452 status: None,
453 }
454 }
455
456 async fn fix_deployment(
459 &self,
460 deployment_api: &Api<Deployment>,
461 new_deployment: &Deployment,
462 ) -> Result<(), Error> {
463 let Some(mut existing_deployment) = self
464 .deployments
465 .get(
466 &ObjectRef::new(&new_deployment.name_unchecked())
467 .within(&new_deployment.namespace().unwrap()),
468 )
469 .map(Arc::unwrap_or_clone)
470 else {
471 return Ok(());
472 };
473
474 if existing_deployment.spec.as_ref().unwrap().selector
475 == new_deployment.spec.as_ref().unwrap().selector
476 {
477 return Ok(());
478 }
479
480 warn!("found existing deployment with old label selector, fixing");
481
482 existing_deployment
485 .spec
486 .as_mut()
487 .unwrap()
488 .template
489 .metadata
490 .as_mut()
491 .unwrap()
492 .labels = new_deployment
493 .spec
494 .as_ref()
495 .unwrap()
496 .template
497 .metadata
498 .as_ref()
499 .unwrap()
500 .labels
501 .clone();
502
503 replace_resource(deployment_api, &existing_deployment).await?;
507 await_condition(
508 deployment_api.clone(),
509 &existing_deployment.name_unchecked(),
510 |deployment: Option<&Deployment>| {
511 let observed_generation = deployment
512 .and_then(|deployment| deployment.status.as_ref())
513 .and_then(|status| status.observed_generation)
514 .unwrap_or(0);
515 let current_generation = deployment
516 .and_then(|deployment| deployment.meta().generation)
517 .unwrap_or(0);
518 let previous_generation = existing_deployment.meta().generation.unwrap_or(0);
519 observed_generation == current_generation
520 && current_generation > previous_generation
521 },
522 )
523 .await
524 .map_err(|e| anyhow::anyhow!(e))?;
525 await_condition(
526 deployment_api.clone(),
527 &existing_deployment.name_unchecked(),
528 is_deployment_completed(),
529 )
530 .await
531 .map_err(|e| anyhow::anyhow!(e))?;
532
533 match kube::runtime::wait::delete::delete_and_finalize(
536 deployment_api.clone(),
537 &existing_deployment.name_unchecked(),
538 &DeleteParams::orphan(),
539 )
540 .await
541 {
542 Ok(_) => {}
543 Err(kube::runtime::wait::delete::Error::Delete(kube::Error::Api(e)))
544 if e.code == 404 =>
545 {
546 }
548 Err(e) => return Err(anyhow::anyhow!(e).into()),
549 }
550
551 Ok(())
557 }
558}
559
560#[async_trait::async_trait]
561impl k8s_controller::Context for Context {
562 type Resource = Balancer;
563 type Error = Error;
564
565 #[instrument(fields())]
566 async fn apply(
567 &self,
568 client: Client,
569 balancer: &Self::Resource,
570 ) -> Result<Option<Action>, Self::Error> {
571 if balancer.status.is_none() {
572 let balancer_api: Api<Balancer> =
573 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
574 let mut new_balancer = balancer.clone();
575 new_balancer.status = Some(balancer.status());
576 balancer_api
577 .replace_status(
578 &balancer.name_unchecked(),
579 &PostParams::default(),
580 serde_json::to_vec(&new_balancer).unwrap(),
581 )
582 .await?;
583 return Ok(None);
586 }
587
588 let namespace = balancer.namespace();
589 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
590 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
591 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
592
593 if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
594 trace!("creating new balancerd external certificate");
595 apply_resource(&certificate_api, &external_certificate).await?;
596 }
597
598 let deployment = self.create_deployment_object(balancer)?;
599 self.fix_deployment(&deployment_api, &deployment).await?;
600 trace!("creating new balancerd deployment");
601 apply_resource(&deployment_api, &deployment).await?;
602
603 let service = self.create_service_object(balancer);
604 trace!("creating new balancerd service");
605 apply_resource(&service_api, &service).await?;
606
607 self.sync_deployment_status(&client, balancer).await?;
608
609 Ok(None)
610 }
611}