1use std::{
11 future,
12 net::SocketAddr,
13 sync::{Arc, LazyLock},
14};
15
16use http::HeaderValue;
17use k8s_openapi::{
18 api::{
19 apps::v1::Deployment,
20 core::v1::{Affinity, ConfigMap, ResourceRequirements, Service, Toleration},
21 networking::v1::NetworkPolicy,
22 },
23 apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
24};
25use kube::{Api, runtime::watcher};
26use mz_cloud_provider::CloudProvider;
27use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
28use tracing::info;
29
30use mz_build_info::{BuildInfo, build_info};
31use mz_orchestrator_kubernetes::{KubernetesImagePullPolicy, util::create_client};
32use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
33use mz_orchestratord::{
34 controller,
35 k8s::register_crds,
36 metrics::{self, Metrics},
37 tls::DefaultCertificateSpecs,
38};
39use mz_ore::{
40 cli::{self, CliConfig, KeyValueArg},
41 error::ErrorExt,
42 metrics::MetricsRegistry,
43};
44
45const BUILD_INFO: BuildInfo = build_info!();
46static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
47
48#[derive(clap::Parser)]
49#[clap(name = "orchestratord", version = VERSION.as_str())]
50pub struct Args {
51 #[structopt(long, env = "KUBERNETES_CONTEXT", default_value = "minikube")]
52 kubernetes_context: String,
53
54 #[clap(long, default_value = "[::]:8004")]
55 profiling_listen_address: SocketAddr,
56 #[clap(long, default_value = "[::]:3100")]
57 metrics_listen_address: SocketAddr,
58
59 #[clap(long)]
60 cloud_provider: CloudProvider,
61 #[clap(long)]
62 region: String,
63 #[clap(long)]
64 create_balancers: bool,
65 #[clap(long)]
66 create_console: bool,
67 #[clap(long)]
68 helm_chart_version: Option<String>,
69 #[clap(long, default_value = "kubernetes")]
70 secrets_controller: String,
71 #[clap(long)]
72 collect_pod_metrics: bool,
73 #[clap(long)]
74 enable_prometheus_scrape_annotations: bool,
75 #[clap(long)]
76 disable_authentication: bool,
77
78 #[clap(long)]
79 segment_api_key: Option<String>,
80 #[clap(long)]
81 segment_client_side: bool,
82
83 #[clap(long)]
84 console_image_tag_default: String,
85 #[clap(long)]
86 console_image_tag_map: Vec<KeyValueArg<String, String>>,
87
88 #[clap(long)]
89 aws_account_id: Option<String>,
90 #[clap(long)]
91 environmentd_iam_role_arn: Option<String>,
92 #[clap(long)]
93 environmentd_connection_role_arn: Option<String>,
94 #[clap(long)]
95 aws_secrets_controller_tags: Vec<String>,
96 #[clap(long)]
97 environmentd_availability_zones: Option<Vec<String>>,
98
99 #[clap(long)]
100 ephemeral_volume_class: Option<String>,
101 #[clap(long)]
102 scheduler_name: Option<String>,
103 #[clap(long)]
104 enable_security_context: bool,
105 #[clap(long)]
106 enable_internal_statement_logging: bool,
107 #[clap(long, default_value = "false")]
108 disable_statement_logging: bool,
109
110 #[clap(long)]
111 orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
112 #[clap(long)]
113 environmentd_node_selector: Vec<KeyValueArg<String, String>>,
114 #[clap(long, value_parser = parse_affinity)]
115 environmentd_affinity: Option<Affinity>,
116 #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
117 environmentd_tolerations: Option<Vec<Toleration>>,
118 #[clap(long, value_parser = parse_resources)]
119 environmentd_default_resources: Option<ResourceRequirements>,
120 #[clap(long)]
121 clusterd_node_selector: Vec<KeyValueArg<String, String>>,
122 #[clap(long, value_parser = parse_affinity)]
123 clusterd_affinity: Option<Affinity>,
124 #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
125 clusterd_tolerations: Option<Vec<Toleration>>,
126 #[clap(long)]
127 balancerd_node_selector: Vec<KeyValueArg<String, String>>,
128 #[clap(long, value_parser = parse_affinity)]
129 balancerd_affinity: Option<Affinity>,
130 #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
131 balancerd_tolerations: Option<Vec<Toleration>>,
132 #[clap(long, value_parser = parse_resources)]
133 balancerd_default_resources: Option<ResourceRequirements>,
134 #[clap(long)]
135 console_node_selector: Vec<KeyValueArg<String, String>>,
136 #[clap(long, value_parser = parse_affinity)]
137 console_affinity: Option<Affinity>,
138 #[clap(long = "console-toleration", value_parser = parse_tolerations)]
139 console_tolerations: Option<Vec<Toleration>>,
140 #[clap(long, value_parser = parse_resources)]
141 console_default_resources: Option<ResourceRequirements>,
142 #[clap(long, default_value = "always", value_enum)]
143 image_pull_policy: KubernetesImagePullPolicy,
144 #[clap(long)]
145 network_policies_internal_enabled: bool,
146 #[clap(long)]
147 network_policies_ingress_enabled: bool,
148 #[clap(long)]
149 network_policies_ingress_cidrs: Vec<String>,
150 #[clap(long)]
151 network_policies_egress_enabled: bool,
152 #[clap(long)]
153 network_policies_egress_cidrs: Vec<String>,
154
155 #[clap(long)]
156 environmentd_cluster_replica_sizes: Option<String>,
157 #[clap(long)]
158 bootstrap_default_cluster_replica_size: Option<String>,
159 #[clap(long)]
160 bootstrap_builtin_system_cluster_replica_size: Option<String>,
161 #[clap(long)]
162 bootstrap_builtin_probe_cluster_replica_size: Option<String>,
163 #[clap(long)]
164 bootstrap_builtin_support_cluster_replica_size: Option<String>,
165 #[clap(long)]
166 bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
167 #[clap(long)]
168 bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
169 #[clap(long)]
170 bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
171 #[clap(long)]
172 bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
173 #[clap(long)]
174 bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
175 #[clap(long)]
176 bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
177
178 #[clap(
179 long,
180 default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
181 )]
182 environmentd_allowed_origins: Vec<HeaderValue>,
183 #[clap(long, default_value = "https://console.materialize.com")]
184 internal_console_proxy_url: String,
185
186 #[clap(long, default_value = "6875")]
187 environmentd_sql_port: u16,
188 #[clap(long, default_value = "6876")]
189 environmentd_http_port: u16,
190 #[clap(long, default_value = "6877")]
191 environmentd_internal_sql_port: u16,
192 #[clap(long, default_value = "6878")]
193 environmentd_internal_http_port: u16,
194 #[clap(long, default_value = "6879")]
195 environmentd_internal_persist_pubsub_port: u16,
196
197 #[clap(long, default_value = "6875")]
198 balancerd_sql_port: u16,
199 #[clap(long, default_value = "6876")]
200 balancerd_http_port: u16,
201 #[clap(long, default_value = "8080")]
202 balancerd_internal_http_port: u16,
203
204 #[clap(long, default_value = "8080")]
205 console_http_port: u16,
206
207 #[clap(long, default_value = "{}")]
208 default_certificate_specs: DefaultCertificateSpecs,
209
210 #[clap(long, hide = true)]
211 disable_license_key_checks: bool,
212
213 #[clap(flatten)]
214 tracing: TracingCliArgs,
215
216 #[clap(long, hide = true, value_parser(parse_crd_columns))]
217 additional_crd_columns: Option<std::vec::Vec<CustomResourceColumnDefinition>>,
218}
219
220fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
221 Ok(serde_json::from_str(s)?)
222}
223
224fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
225 Ok(serde_json::from_str(s)?)
226}
227
228fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
229 Ok(serde_json::from_str(s)?)
230}
231
232fn parse_crd_columns(val: &str) -> Result<Vec<CustomResourceColumnDefinition>, serde_json::Error> {
233 serde_json::from_str(val)
234}
235
236#[tokio::main]
237async fn main() {
238 mz_ore::panic::install_enhanced_handler();
239
240 let args = cli::parse_args(CliConfig {
241 env_prefix: Some("ORCHESTRATORD_"),
242 enable_version_flag: true,
243 });
244 if let Err(err) = run(args).await {
245 panic!("orchestratord: fatal: {}", err.display_with_causes());
246 }
247}
248
249async fn run(args: Args) -> Result<(), anyhow::Error> {
250 let metrics_registry = MetricsRegistry::new();
251 args.tracing
252 .configure_tracing(
253 StaticTracingConfig {
254 service_name: "orchestratord",
255 build_info: BUILD_INFO,
256 },
257 metrics_registry.clone(),
258 )
259 .await?;
260
261 let metrics = Arc::new(Metrics::register_into(&metrics_registry));
262
263 let (client, namespace) = create_client(args.kubernetes_context.clone()).await?;
264 register_crds(
265 client.clone(),
266 args.additional_crd_columns.unwrap_or_default(),
267 )
268 .await?;
269
270 {
271 let router = mz_prof_http::router(&BUILD_INFO);
272 let address = args.profiling_listen_address.clone();
273 mz_ore::task::spawn(|| "profiling API internal web server", async move {
274 if let Err(e) = axum::serve(
275 tokio::net::TcpListener::bind(&address).await.unwrap(),
276 router.into_make_service(),
277 )
278 .await
279 {
280 panic!(
281 "profiling API internal web server failed: {}",
282 e.display_with_causes()
283 );
284 }
285 });
286 }
287
288 {
289 let router = metrics::router(metrics_registry.clone());
290 let address = args.metrics_listen_address.clone();
291 mz_ore::task::spawn(|| "metrics server", async move {
292 if let Err(e) = axum::serve(
293 tokio::net::TcpListener::bind(&address).await.unwrap(),
294 router.into_make_service(),
295 )
296 .await
297 {
298 panic!("metrics server failed: {}", e.display_with_causes());
299 }
300 });
301 }
302
303 mz_ore::task::spawn(
304 || "materialize controller",
305 k8s_controller::Controller::namespaced_all(
306 client.clone(),
307 controller::materialize::Context::new(
308 controller::materialize::Config {
309 cloud_provider: args.cloud_provider,
310 region: args.region,
311 create_balancers: args.create_balancers,
312 create_console: args.create_console,
313 helm_chart_version: args.helm_chart_version,
314 secrets_controller: args.secrets_controller,
315 collect_pod_metrics: args.collect_pod_metrics,
316 enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
317 segment_api_key: args.segment_api_key,
318 segment_client_side: args.segment_client_side,
319 console_image_tag_default: args.console_image_tag_default,
320 console_image_tag_map: args.console_image_tag_map,
321 aws_account_id: args.aws_account_id,
322 environmentd_iam_role_arn: args.environmentd_iam_role_arn,
323 environmentd_connection_role_arn: args.environmentd_connection_role_arn,
324 aws_secrets_controller_tags: args.aws_secrets_controller_tags,
325 environmentd_availability_zones: args.environmentd_availability_zones,
326 ephemeral_volume_class: args.ephemeral_volume_class,
327 scheduler_name: args.scheduler_name.clone(),
328 enable_security_context: args.enable_security_context,
329 enable_internal_statement_logging: args.enable_internal_statement_logging,
330 disable_statement_logging: args.disable_statement_logging,
331 orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels,
332 environmentd_node_selector: args.environmentd_node_selector,
333 environmentd_affinity: args.environmentd_affinity,
334 environmentd_tolerations: args.environmentd_tolerations,
335 environmentd_default_resources: args.environmentd_default_resources,
336 clusterd_node_selector: args.clusterd_node_selector,
337 clusterd_affinity: args.clusterd_affinity,
338 clusterd_tolerations: args.clusterd_tolerations,
339 image_pull_policy: args.image_pull_policy,
340 network_policies_internal_enabled: args.network_policies_internal_enabled,
341 network_policies_ingress_enabled: args.network_policies_ingress_enabled,
342 network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(),
343 network_policies_egress_enabled: args.network_policies_egress_enabled,
344 network_policies_egress_cidrs: args.network_policies_egress_cidrs,
345 environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes,
346 bootstrap_default_cluster_replica_size: args
347 .bootstrap_default_cluster_replica_size,
348 bootstrap_builtin_system_cluster_replica_size: args
349 .bootstrap_builtin_system_cluster_replica_size,
350 bootstrap_builtin_probe_cluster_replica_size: args
351 .bootstrap_builtin_probe_cluster_replica_size,
352 bootstrap_builtin_support_cluster_replica_size: args
353 .bootstrap_builtin_support_cluster_replica_size,
354 bootstrap_builtin_catalog_server_cluster_replica_size: args
355 .bootstrap_builtin_catalog_server_cluster_replica_size,
356 bootstrap_builtin_analytics_cluster_replica_size: args
357 .bootstrap_builtin_analytics_cluster_replica_size,
358 bootstrap_builtin_system_cluster_replication_factor: args
359 .bootstrap_builtin_system_cluster_replication_factor,
360 bootstrap_builtin_probe_cluster_replication_factor: args
361 .bootstrap_builtin_probe_cluster_replication_factor,
362 bootstrap_builtin_support_cluster_replication_factor: args
363 .bootstrap_builtin_support_cluster_replication_factor,
364 bootstrap_builtin_analytics_cluster_replication_factor: args
365 .bootstrap_builtin_analytics_cluster_replication_factor,
366 environmentd_allowed_origins: args.environmentd_allowed_origins,
367 internal_console_proxy_url: args.internal_console_proxy_url,
368 environmentd_sql_port: args.environmentd_sql_port,
369 environmentd_http_port: args.environmentd_http_port,
370 environmentd_internal_sql_port: args.environmentd_internal_sql_port,
371 environmentd_internal_http_port: args.environmentd_internal_http_port,
372 environmentd_internal_persist_pubsub_port: args
373 .environmentd_internal_persist_pubsub_port,
374 default_certificate_specs: args.default_certificate_specs.clone(),
375 disable_license_key_checks: args.disable_license_key_checks,
376 tracing: args.tracing,
377 orchestratord_namespace: namespace,
378 },
379 Arc::clone(&metrics),
380 client.clone(),
381 )
382 .await,
383 watcher::Config::default().timeout(29),
384 )
385 .run(),
386 );
387
388 mz_ore::task::spawn(
389 || "balancer controller",
390 k8s_controller::Controller::namespaced_all(
391 client.clone(),
392 controller::balancer::Context::new(
393 controller::balancer::Config {
394 enable_security_context: args.enable_security_context,
395 enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
396 image_pull_policy: args.image_pull_policy,
397 scheduler_name: args.scheduler_name.clone(),
398 balancerd_node_selector: args.balancerd_node_selector,
399 balancerd_affinity: args.balancerd_affinity,
400 balancerd_tolerations: args.balancerd_tolerations,
401 balancerd_default_resources: args.balancerd_default_resources,
402 default_certificate_specs: args.default_certificate_specs.clone(),
403 environmentd_sql_port: args.environmentd_sql_port,
404 environmentd_http_port: args.environmentd_http_port,
405 balancerd_sql_port: args.balancerd_sql_port,
406 balancerd_http_port: args.balancerd_http_port,
407 balancerd_internal_http_port: args.balancerd_internal_http_port,
408 },
409 client.clone(),
410 )
411 .await,
412 watcher::Config::default().timeout(29),
413 )
414 .with_controller(|controller| {
415 controller
416 .owns(
417 Api::<Deployment>::all(client.clone()),
418 watcher::Config::default()
419 .labels("materialize.cloud/mz-resource-id")
420 .timeout(29),
421 )
422 .owns(
423 Api::<Service>::all(client.clone()),
424 watcher::Config::default()
425 .labels("materialize.cloud/mz-resource-id")
426 .timeout(29),
427 )
428 .owns(
429 Api::<Certificate>::all(client.clone()),
430 watcher::Config::default()
431 .labels("materialize.cloud/mz-resource-id")
432 .timeout(29),
433 )
434 })
435 .run(),
436 );
437
438 mz_ore::task::spawn(
439 || "console controller",
440 k8s_controller::Controller::namespaced_all(
441 client.clone(),
442 controller::console::Context::new(
443 controller::console::Config {
444 enable_security_context: args.enable_security_context,
445 enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
446 image_pull_policy: args.image_pull_policy,
447 scheduler_name: args.scheduler_name,
448 console_node_selector: args.console_node_selector,
449 console_affinity: args.console_affinity,
450 console_tolerations: args.console_tolerations,
451 console_default_resources: args.console_default_resources,
452 network_policies_ingress_enabled: args.network_policies_ingress_enabled,
453 network_policies_ingress_cidrs: args.network_policies_ingress_cidrs,
454 default_certificate_specs: args.default_certificate_specs,
455 console_http_port: args.console_http_port,
456 balancerd_http_port: args.balancerd_http_port,
457 },
458 client.clone(),
459 )
460 .await,
461 watcher::Config::default().timeout(29),
462 )
463 .with_controller(|controller| {
464 controller
465 .owns(
466 Api::<Deployment>::all(client.clone()),
467 watcher::Config::default()
468 .labels("materialize.cloud/mz-resource-id")
469 .timeout(29),
470 )
471 .owns(
472 Api::<Service>::all(client.clone()),
473 watcher::Config::default()
474 .labels("materialize.cloud/mz-resource-id")
475 .timeout(29),
476 )
477 .owns(
478 Api::<Certificate>::all(client.clone()),
479 watcher::Config::default()
480 .labels("materialize.cloud/mz-resource-id")
481 .timeout(29),
482 )
483 .owns(
484 Api::<NetworkPolicy>::all(client.clone()),
485 watcher::Config::default()
486 .labels("materialize.cloud/mz-resource-id")
487 .timeout(29),
488 )
489 .owns(
490 Api::<ConfigMap>::all(client.clone()),
491 watcher::Config::default()
492 .labels("materialize.cloud/mz-resource-id")
493 .timeout(29),
494 )
495 })
496 .run(),
497 );
498
499 info!("All tasks started successfully.");
500
501 future::pending().await
502}