1use anyhow::bail;
11use k8s_openapi::{
12 api::{
13 apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy, RollingUpdateDeployment},
14 core::v1::{
15 Affinity, Capabilities, Container, ContainerPort, HTTPGetAction, PodSecurityContext,
16 PodSpec, PodTemplateSpec, Probe, ResourceRequirements, SeccompProfile,
17 SecretVolumeSource, SecurityContext, Service, ServicePort, ServiceSpec, Toleration,
18 Volume, VolumeMount,
19 },
20 },
21 apimachinery::pkg::{
22 apis::meta::v1::{Condition, LabelSelector, Time},
23 util::intstr::IntOrString,
24 },
25};
26use kube::{
27 Api, Client, Resource, ResourceExt,
28 api::{ObjectMeta, PostParams},
29 runtime::{
30 controller::Action,
31 reflector::{ObjectRef, Store},
32 },
33};
34use maplit::btreemap;
35use tracing::trace;
36
37use crate::{
38 Error,
39 k8s::{apply_resource, make_reflector},
40 tls::{DefaultCertificateSpecs, create_certificate, issuer_ref_defined},
41};
42use mz_cloud_resources::crd::{
43 ManagedResource,
44 balancer::v1alpha1::{Balancer, Routing},
45 generated::cert_manager::certificates::{Certificate, CertificatePrivateKeyAlgorithm},
46};
47use mz_orchestrator_kubernetes::KubernetesImagePullPolicy;
48use mz_ore::{cli::KeyValueArg, instrument};
49
50pub struct Config {
51 pub enable_security_context: bool,
52 pub enable_prometheus_scrape_annotations: bool,
53
54 pub image_pull_policy: KubernetesImagePullPolicy,
55 pub scheduler_name: Option<String>,
56 pub balancerd_node_selector: Vec<KeyValueArg<String, String>>,
57 pub balancerd_affinity: Option<Affinity>,
58 pub balancerd_tolerations: Option<Vec<Toleration>>,
59 pub balancerd_default_resources: Option<ResourceRequirements>,
60
61 pub default_certificate_specs: DefaultCertificateSpecs,
62
63 pub environmentd_sql_port: u16,
64 pub environmentd_http_port: u16,
65 pub balancerd_sql_port: u16,
66 pub balancerd_http_port: u16,
67 pub balancerd_internal_http_port: u16,
68}
69
70pub struct Context {
71 config: Config,
72 deployments: Store<Deployment>,
73}
74
75impl Context {
76 pub async fn new(config: Config, client: Client) -> Self {
77 Self {
78 config,
79 deployments: make_reflector(client).await,
80 }
81 }
82
83 async fn sync_deployment_status(
84 &self,
85 client: &Client,
86 balancer: &Balancer,
87 ) -> Result<(), kube::Error> {
88 let namespace = balancer.namespace().unwrap();
89 let balancer_api: Api<Balancer> = Api::namespaced(client.clone(), &namespace);
90
91 let Some(deployment) = self
92 .deployments
93 .get(&ObjectRef::new(&balancer.deployment_name()).within(&namespace))
94 else {
95 return Ok(());
96 };
97
98 let Some(deployment_conditions) = &deployment
99 .status
100 .as_ref()
101 .and_then(|status| status.conditions.as_ref())
102 else {
103 return Ok(());
106 };
107
108 let ready = deployment_conditions
109 .iter()
110 .any(|condition| condition.type_ == "Available" && condition.status == "True");
111 let ready_str = if ready { "True" } else { "False" };
112
113 let mut status = balancer.status.clone().unwrap();
114 if status
115 .conditions
116 .iter()
117 .any(|condition| condition.type_ == "Ready" && condition.status == ready_str)
118 {
119 return Ok(());
123 }
124
125 status.conditions = vec![Condition {
126 type_: "Ready".to_string(),
127 status: ready_str.to_string(),
128 last_transition_time: Time(chrono::offset::Utc::now()),
129 message: format!(
130 "balancerd deployment is{} ready",
131 if ready { "" } else { " not" }
132 ),
133 observed_generation: None,
134 reason: "DeploymentStatus".to_string(),
135 }];
136 let mut new_balancer = balancer.clone();
137 new_balancer.status = Some(status);
138
139 balancer_api
140 .replace_status(
141 &balancer.name_unchecked(),
142 &PostParams::default(),
143 serde_json::to_vec(&new_balancer).unwrap(),
144 )
145 .await?;
146
147 Ok(())
148 }
149
150 fn create_external_certificate_object(&self, balancer: &Balancer) -> Option<Certificate> {
151 create_certificate(
152 self.config
153 .default_certificate_specs
154 .balancerd_external
155 .clone(),
156 balancer,
157 balancer.spec.external_certificate_spec.clone(),
158 balancer.external_certificate_name(),
159 balancer.external_certificate_secret_name(),
160 None,
161 CertificatePrivateKeyAlgorithm::Rsa,
162 Some(4096),
163 )
164 }
165
166 fn create_deployment_object(&self, balancer: &Balancer) -> anyhow::Result<Deployment> {
167 let security_context = if self.config.enable_security_context {
168 Some(SecurityContext {
172 run_as_non_root: Some(true),
173 capabilities: Some(Capabilities {
174 drop: Some(vec!["ALL".to_string()]),
175 ..Default::default()
176 }),
177 seccomp_profile: Some(SeccompProfile {
178 type_: "RuntimeDefault".to_string(),
179 ..Default::default()
180 }),
181 allow_privilege_escalation: Some(false),
182 ..Default::default()
183 })
184 } else {
185 None
186 };
187
188 let pod_template_annotations = if self.config.enable_prometheus_scrape_annotations {
189 Some(btreemap! {
190 "prometheus.io/scrape".to_owned() => "true".to_string(),
191 "prometheus.io/port".to_owned() => self.config.balancerd_internal_http_port.to_string(),
192 "prometheus.io/path".to_owned() => "/metrics".to_string(),
193 "prometheus.io/scheme".to_owned() => "http".to_string(),
194 })
195 } else {
196 None
197 };
198 let mut pod_template_labels = balancer.default_labels();
199 pod_template_labels.insert(
200 "materialize.cloud/name".to_owned(),
201 balancer.deployment_name(),
202 );
203 pod_template_labels.insert("app".to_owned(), "balancerd".to_string());
204 pod_template_labels.insert("materialize.cloud/app".to_owned(), balancer.app_name());
205
206 let ports = vec![
207 ContainerPort {
208 container_port: self.config.balancerd_sql_port.into(),
209 name: Some("pgwire".into()),
210 protocol: Some("TCP".into()),
211 ..Default::default()
212 },
213 ContainerPort {
214 container_port: self.config.balancerd_http_port.into(),
215 name: Some("http".into()),
216 protocol: Some("TCP".into()),
217 ..Default::default()
218 },
219 ContainerPort {
220 container_port: self.config.balancerd_internal_http_port.into(),
221 name: Some("internal-http".into()),
222 protocol: Some("TCP".into()),
223 ..Default::default()
224 },
225 ];
226
227 let mut args = vec![
228 "service".to_string(),
229 format!(
230 "--pgwire-listen-addr=0.0.0.0:{}",
231 self.config.balancerd_sql_port
232 ),
233 format!(
234 "--https-listen-addr=0.0.0.0:{}",
235 self.config.balancerd_http_port
236 ),
237 format!(
238 "--internal-http-listen-addr=0.0.0.0:{}",
239 self.config.balancerd_internal_http_port
240 ),
241 ];
242 match balancer.routing()? {
243 Routing::Static(static_routing_config) => {
244 args.extend([
245 format!(
246 "--https-resolver-template={}.{}.svc.cluster.local:{}",
247 static_routing_config.environmentd_service_name,
248 static_routing_config.environmentd_namespace,
249 self.config.environmentd_http_port
250 ),
251 format!(
252 "--static-resolver-addr={}.{}.svc.cluster.local:{}",
253 static_routing_config.environmentd_service_name,
254 static_routing_config.environmentd_namespace,
255 self.config.environmentd_sql_port
256 ),
257 ]);
258 }
259 Routing::Frontegg(_frontegg_routing_config) => {
260 bail!("frontegg routing is not yet implemented");
261 }
262 }
263
264 if issuer_ref_defined(
265 &self.config.default_certificate_specs.internal,
266 &balancer.spec.internal_certificate_spec,
267 ) {
268 args.push("--internal-tls".to_owned())
269 }
270
271 let mut volumes = Vec::new();
272 let mut volume_mounts = Vec::new();
273 if issuer_ref_defined(
274 &self.config.default_certificate_specs.balancerd_external,
275 &balancer.spec.external_certificate_spec,
276 ) {
277 volumes.push(Volume {
278 name: "external-certificate".to_owned(),
279 secret: Some(SecretVolumeSource {
280 default_mode: Some(0o400),
281 secret_name: Some(balancer.external_certificate_secret_name()),
282 items: None,
283 optional: Some(false),
284 }),
285 ..Default::default()
286 });
287 volume_mounts.push(VolumeMount {
288 name: "external-certificate".to_owned(),
289 mount_path: "/etc/external_tls".to_owned(),
290 read_only: Some(true),
291 ..Default::default()
292 });
293 args.extend([
294 "--tls-mode=require".into(),
295 "--tls-cert=/etc/external_tls/tls.crt".into(),
296 "--tls-key=/etc/external_tls/tls.key".into(),
297 ]);
298 } else {
299 args.push("--tls-mode=disable".to_string());
300 }
301
302 let startup_probe = Probe {
303 http_get: Some(HTTPGetAction {
304 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
305 path: Some("/api/readyz".into()),
306 ..Default::default()
307 }),
308 failure_threshold: Some(20),
309 initial_delay_seconds: Some(3),
310 period_seconds: Some(3),
311 success_threshold: Some(1),
312 timeout_seconds: Some(1),
313 ..Default::default()
314 };
315 let readiness_probe = Probe {
316 http_get: Some(HTTPGetAction {
317 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
318 path: Some("/api/readyz".into()),
319 ..Default::default()
320 }),
321 failure_threshold: Some(3),
322 period_seconds: Some(10),
323 success_threshold: Some(1),
324 timeout_seconds: Some(1),
325 ..Default::default()
326 };
327 let liveness_probe = Probe {
328 http_get: Some(HTTPGetAction {
329 port: IntOrString::Int(self.config.balancerd_internal_http_port.into()),
330 path: Some("/api/livez".into()),
331 ..Default::default()
332 }),
333 failure_threshold: Some(3),
334 initial_delay_seconds: Some(8),
335 period_seconds: Some(10),
336 success_threshold: Some(1),
337 timeout_seconds: Some(1),
338 ..Default::default()
339 };
340
341 let container = Container {
342 name: "balancerd".to_owned(),
343 image: Some(balancer.spec.balancerd_image_ref.clone()),
344 image_pull_policy: Some(self.config.image_pull_policy.to_string()),
345 ports: Some(ports),
346 args: Some(args),
347 startup_probe: Some(startup_probe),
348 readiness_probe: Some(readiness_probe),
349 liveness_probe: Some(liveness_probe),
350 resources: balancer
351 .spec
352 .resource_requirements
353 .clone()
354 .or_else(|| self.config.balancerd_default_resources.clone()),
355 security_context: security_context.clone(),
356 volume_mounts: Some(volume_mounts),
357 ..Default::default()
358 };
359
360 let deployment_spec = DeploymentSpec {
361 replicas: Some(balancer.replicas()),
362 selector: LabelSelector {
363 match_labels: Some(pod_template_labels.clone()),
364 ..Default::default()
365 },
366 strategy: Some(DeploymentStrategy {
367 rolling_update: Some(RollingUpdateDeployment {
368 max_surge: Some(IntOrString::String("100%".into())),
372 ..Default::default()
373 }),
374 ..Default::default()
375 }),
376 template: PodTemplateSpec {
377 metadata: Some(ObjectMeta {
380 annotations: pod_template_annotations,
381 labels: Some(pod_template_labels),
382 ..Default::default()
383 }),
384 spec: Some(PodSpec {
385 containers: vec![container],
386 node_selector: Some(
387 self.config
388 .balancerd_node_selector
389 .iter()
390 .map(|selector| (selector.key.clone(), selector.value.clone()))
391 .collect(),
392 ),
393 affinity: self.config.balancerd_affinity.clone(),
394 tolerations: self.config.balancerd_tolerations.clone(),
395 security_context: Some(PodSecurityContext {
396 fs_group: Some(999),
397 run_as_user: Some(999),
398 run_as_group: Some(999),
399 ..Default::default()
400 }),
401 scheduler_name: self.config.scheduler_name.clone(),
402 volumes: Some(volumes),
403 ..Default::default()
404 }),
405 },
406 ..Default::default()
407 };
408
409 Ok(Deployment {
410 metadata: balancer.managed_resource_meta(balancer.deployment_name()),
411 spec: Some(deployment_spec),
412 status: None,
413 })
414 }
415
416 fn create_service_object(&self, balancer: &Balancer) -> Service {
417 let selector =
418 btreemap! {"materialize.cloud/name".to_string() => balancer.deployment_name()};
419
420 let ports = vec![
421 ServicePort {
422 name: Some("http".to_string()),
423 protocol: Some("TCP".to_string()),
424 port: self.config.balancerd_http_port.into(),
425 target_port: Some(IntOrString::Int(self.config.balancerd_http_port.into())),
426 ..Default::default()
427 },
428 ServicePort {
429 name: Some("pgwire".to_string()),
430 protocol: Some("TCP".to_string()),
431 port: self.config.balancerd_sql_port.into(),
432 target_port: Some(IntOrString::Int(self.config.balancerd_sql_port.into())),
433 ..Default::default()
434 },
435 ];
436
437 let spec = ServiceSpec {
438 type_: Some("ClusterIP".to_string()),
439 cluster_ip: Some("None".to_string()),
440 selector: Some(selector),
441 ports: Some(ports),
442 ..Default::default()
443 };
444
445 Service {
446 metadata: balancer.managed_resource_meta(balancer.service_name()),
447 spec: Some(spec),
448 status: None,
449 }
450 }
451}
452
453#[async_trait::async_trait]
454impl k8s_controller::Context for Context {
455 type Resource = Balancer;
456 type Error = Error;
457
458 #[instrument(fields())]
459 async fn apply(
460 &self,
461 client: Client,
462 balancer: &Self::Resource,
463 ) -> Result<Option<Action>, Self::Error> {
464 if balancer.status.is_none() {
465 let balancer_api: Api<Balancer> =
466 Api::namespaced(client.clone(), &balancer.meta().namespace.clone().unwrap());
467 let mut new_balancer = balancer.clone();
468 new_balancer.status = Some(balancer.status());
469 balancer_api
470 .replace_status(
471 &balancer.name_unchecked(),
472 &PostParams::default(),
473 serde_json::to_vec(&new_balancer).unwrap(),
474 )
475 .await?;
476 return Ok(None);
479 }
480
481 let namespace = balancer.namespace().unwrap();
482 let certificate_api: Api<Certificate> = Api::namespaced(client.clone(), &namespace);
483 let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
484 let service_api: Api<Service> = Api::namespaced(client.clone(), &namespace);
485
486 if let Some(external_certificate) = self.create_external_certificate_object(balancer) {
487 trace!("creating new balancerd external certificate");
488 apply_resource(&certificate_api, &external_certificate).await?;
489 }
490
491 let deployment = self.create_deployment_object(balancer)?;
492 trace!("creating new balancerd deployment");
493 apply_resource(&deployment_api, &deployment).await?;
494
495 let service = self.create_service_object(balancer);
496 trace!("creating new balancerd service");
497 apply_resource(&service_api, &service).await?;
498
499 self.sync_deployment_status(&client, balancer).await?;
500
501 Ok(None)
502 }
503}