orchestratord/
orchestratord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    future,
12    net::SocketAddr,
13    sync::{Arc, LazyLock},
14};
15
16use http::HeaderValue;
17use k8s_openapi::{
18    api::{
19        apps::v1::Deployment,
20        core::v1::{Affinity, ConfigMap, ResourceRequirements, Service, Toleration},
21        networking::v1::NetworkPolicy,
22    },
23    apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceColumnDefinition,
24};
25use kube::{Api, runtime::watcher};
26use mz_cloud_provider::CloudProvider;
27use mz_cloud_resources::crd::generated::cert_manager::certificates::Certificate;
28use tracing::info;
29
30use mz_build_info::{BuildInfo, build_info};
31use mz_orchestrator_kubernetes::{KubernetesImagePullPolicy, util::create_client};
32use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
33use mz_orchestratord::{
34    controller,
35    k8s::register_crds,
36    metrics::{self, Metrics},
37    tls::DefaultCertificateSpecs,
38};
39use mz_ore::{
40    cli::{self, CliConfig, KeyValueArg},
41    error::ErrorExt,
42    metrics::MetricsRegistry,
43};
44
45const BUILD_INFO: BuildInfo = build_info!();
46static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
47
48#[derive(clap::Parser)]
49#[clap(name = "orchestratord", version = VERSION.as_str())]
50pub struct Args {
51    #[structopt(long, env = "KUBERNETES_CONTEXT", default_value = "minikube")]
52    kubernetes_context: String,
53
54    #[clap(long, default_value = "[::]:8004")]
55    profiling_listen_address: SocketAddr,
56    #[clap(long, default_value = "[::]:3100")]
57    metrics_listen_address: SocketAddr,
58
59    #[clap(long)]
60    cloud_provider: CloudProvider,
61    #[clap(long)]
62    region: String,
63    #[clap(long)]
64    create_balancers: bool,
65    #[clap(long)]
66    create_console: bool,
67    #[clap(long)]
68    helm_chart_version: Option<String>,
69    #[clap(long, default_value = "kubernetes")]
70    secrets_controller: String,
71    #[clap(long)]
72    collect_pod_metrics: bool,
73    #[clap(long)]
74    enable_prometheus_scrape_annotations: bool,
75    #[clap(long)]
76    disable_authentication: bool,
77
78    #[clap(long)]
79    segment_api_key: Option<String>,
80    #[clap(long)]
81    segment_client_side: bool,
82
83    #[clap(long)]
84    console_image_tag_default: String,
85    #[clap(long)]
86    console_image_tag_map: Vec<KeyValueArg<String, String>>,
87
88    #[clap(long)]
89    aws_account_id: Option<String>,
90    #[clap(long)]
91    environmentd_iam_role_arn: Option<String>,
92    #[clap(long)]
93    environmentd_connection_role_arn: Option<String>,
94    #[clap(long)]
95    aws_secrets_controller_tags: Vec<String>,
96    #[clap(long)]
97    environmentd_availability_zones: Option<Vec<String>>,
98
99    #[clap(long)]
100    ephemeral_volume_class: Option<String>,
101    #[clap(long)]
102    scheduler_name: Option<String>,
103    #[clap(long)]
104    enable_security_context: bool,
105    #[clap(long)]
106    enable_internal_statement_logging: bool,
107    #[clap(long, default_value = "false")]
108    disable_statement_logging: bool,
109
110    #[clap(long)]
111    orchestratord_pod_selector_labels: Vec<KeyValueArg<String, String>>,
112    #[clap(long)]
113    environmentd_node_selector: Vec<KeyValueArg<String, String>>,
114    #[clap(long, value_parser = parse_affinity)]
115    environmentd_affinity: Option<Affinity>,
116    #[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
117    environmentd_tolerations: Option<Vec<Toleration>>,
118    #[clap(long, value_parser = parse_resources)]
119    environmentd_default_resources: Option<ResourceRequirements>,
120    #[clap(long)]
121    clusterd_node_selector: Vec<KeyValueArg<String, String>>,
122    #[clap(long, value_parser = parse_affinity)]
123    clusterd_affinity: Option<Affinity>,
124    #[clap(long = "clusterd-toleration", value_parser = parse_tolerations)]
125    clusterd_tolerations: Option<Vec<Toleration>>,
126    #[clap(long)]
127    balancerd_node_selector: Vec<KeyValueArg<String, String>>,
128    #[clap(long, value_parser = parse_affinity)]
129    balancerd_affinity: Option<Affinity>,
130    #[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
131    balancerd_tolerations: Option<Vec<Toleration>>,
132    #[clap(long, value_parser = parse_resources)]
133    balancerd_default_resources: Option<ResourceRequirements>,
134    #[clap(long)]
135    console_node_selector: Vec<KeyValueArg<String, String>>,
136    #[clap(long, value_parser = parse_affinity)]
137    console_affinity: Option<Affinity>,
138    #[clap(long = "console-toleration", value_parser = parse_tolerations)]
139    console_tolerations: Option<Vec<Toleration>>,
140    #[clap(long, value_parser = parse_resources)]
141    console_default_resources: Option<ResourceRequirements>,
142    #[clap(long, default_value = "always", value_enum)]
143    image_pull_policy: KubernetesImagePullPolicy,
144    #[clap(long)]
145    network_policies_internal_enabled: bool,
146    #[clap(long)]
147    network_policies_ingress_enabled: bool,
148    #[clap(long)]
149    network_policies_ingress_cidrs: Vec<String>,
150    #[clap(long)]
151    network_policies_egress_enabled: bool,
152    #[clap(long)]
153    network_policies_egress_cidrs: Vec<String>,
154
155    #[clap(long)]
156    environmentd_cluster_replica_sizes: Option<String>,
157    #[clap(long)]
158    bootstrap_default_cluster_replica_size: Option<String>,
159    #[clap(long)]
160    bootstrap_builtin_system_cluster_replica_size: Option<String>,
161    #[clap(long)]
162    bootstrap_builtin_probe_cluster_replica_size: Option<String>,
163    #[clap(long)]
164    bootstrap_builtin_support_cluster_replica_size: Option<String>,
165    #[clap(long)]
166    bootstrap_builtin_catalog_server_cluster_replica_size: Option<String>,
167    #[clap(long)]
168    bootstrap_builtin_analytics_cluster_replica_size: Option<String>,
169    #[clap(long)]
170    bootstrap_builtin_system_cluster_replication_factor: Option<u32>,
171    #[clap(long)]
172    bootstrap_builtin_probe_cluster_replication_factor: Option<u32>,
173    #[clap(long)]
174    bootstrap_builtin_support_cluster_replication_factor: Option<u32>,
175    #[clap(long)]
176    bootstrap_builtin_analytics_cluster_replication_factor: Option<u32>,
177
178    #[clap(
179        long,
180        default_values = &["http://local.dev.materialize.com:3000", "http://local.mtrlz.com:3000", "http://localhost:3000", "https://staging.console.materialize.com"],
181    )]
182    environmentd_allowed_origins: Vec<HeaderValue>,
183    #[clap(long, default_value = "https://console.materialize.com")]
184    internal_console_proxy_url: String,
185
186    #[clap(long, default_value = "6875")]
187    environmentd_sql_port: u16,
188    #[clap(long, default_value = "6876")]
189    environmentd_http_port: u16,
190    #[clap(long, default_value = "6877")]
191    environmentd_internal_sql_port: u16,
192    #[clap(long, default_value = "6878")]
193    environmentd_internal_http_port: u16,
194    #[clap(long, default_value = "6879")]
195    environmentd_internal_persist_pubsub_port: u16,
196
197    #[clap(long, default_value = "6875")]
198    balancerd_sql_port: u16,
199    #[clap(long, default_value = "6876")]
200    balancerd_http_port: u16,
201    #[clap(long, default_value = "8080")]
202    balancerd_internal_http_port: u16,
203
204    #[clap(long, default_value = "8080")]
205    console_http_port: u16,
206
207    #[clap(long, default_value = "{}")]
208    default_certificate_specs: DefaultCertificateSpecs,
209
210    #[clap(long, hide = true)]
211    disable_license_key_checks: bool,
212
213    #[clap(flatten)]
214    tracing: TracingCliArgs,
215
216    #[clap(long, hide = true, value_parser(parse_crd_columns))]
217    additional_crd_columns: Option<std::vec::Vec<CustomResourceColumnDefinition>>,
218}
219
220fn parse_affinity(s: &str) -> anyhow::Result<Affinity> {
221    Ok(serde_json::from_str(s)?)
222}
223
224fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
225    Ok(serde_json::from_str(s)?)
226}
227
228fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
229    Ok(serde_json::from_str(s)?)
230}
231
232fn parse_crd_columns(val: &str) -> Result<Vec<CustomResourceColumnDefinition>, serde_json::Error> {
233    serde_json::from_str(val)
234}
235
236#[tokio::main]
237async fn main() {
238    mz_ore::panic::install_enhanced_handler();
239
240    let args = cli::parse_args(CliConfig {
241        env_prefix: Some("ORCHESTRATORD_"),
242        enable_version_flag: true,
243    });
244    if let Err(err) = run(args).await {
245        panic!("orchestratord: fatal: {}", err.display_with_causes());
246    }
247}
248
249async fn run(args: Args) -> Result<(), anyhow::Error> {
250    let metrics_registry = MetricsRegistry::new();
251    args.tracing
252        .configure_tracing(
253            StaticTracingConfig {
254                service_name: "orchestratord",
255                build_info: BUILD_INFO,
256            },
257            metrics_registry.clone(),
258        )
259        .await?;
260
261    let metrics = Arc::new(Metrics::register_into(&metrics_registry));
262
263    let (client, namespace) = create_client(args.kubernetes_context.clone()).await?;
264    register_crds(
265        client.clone(),
266        args.additional_crd_columns.unwrap_or_default(),
267    )
268    .await?;
269
270    {
271        let router = mz_prof_http::router(&BUILD_INFO);
272        let address = args.profiling_listen_address.clone();
273        mz_ore::task::spawn(|| "profiling API internal web server", async move {
274            if let Err(e) = axum::serve(
275                tokio::net::TcpListener::bind(&address).await.unwrap(),
276                router.into_make_service(),
277            )
278            .await
279            {
280                panic!(
281                    "profiling API internal web server failed: {}",
282                    e.display_with_causes()
283                );
284            }
285        });
286    }
287
288    {
289        let router = metrics::router(metrics_registry.clone());
290        let address = args.metrics_listen_address.clone();
291        mz_ore::task::spawn(|| "metrics server", async move {
292            if let Err(e) = axum::serve(
293                tokio::net::TcpListener::bind(&address).await.unwrap(),
294                router.into_make_service(),
295            )
296            .await
297            {
298                panic!("metrics server failed: {}", e.display_with_causes());
299            }
300        });
301    }
302
303    mz_ore::task::spawn(
304        || "materialize controller",
305        k8s_controller::Controller::namespaced_all(
306            client.clone(),
307            controller::materialize::Context::new(
308                controller::materialize::Config {
309                    cloud_provider: args.cloud_provider,
310                    region: args.region,
311                    create_balancers: args.create_balancers,
312                    create_console: args.create_console,
313                    helm_chart_version: args.helm_chart_version,
314                    secrets_controller: args.secrets_controller,
315                    collect_pod_metrics: args.collect_pod_metrics,
316                    enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
317                    segment_api_key: args.segment_api_key,
318                    segment_client_side: args.segment_client_side,
319                    console_image_tag_default: args.console_image_tag_default,
320                    console_image_tag_map: args.console_image_tag_map,
321                    aws_account_id: args.aws_account_id,
322                    environmentd_iam_role_arn: args.environmentd_iam_role_arn,
323                    environmentd_connection_role_arn: args.environmentd_connection_role_arn,
324                    aws_secrets_controller_tags: args.aws_secrets_controller_tags,
325                    environmentd_availability_zones: args.environmentd_availability_zones,
326                    ephemeral_volume_class: args.ephemeral_volume_class,
327                    scheduler_name: args.scheduler_name.clone(),
328                    enable_security_context: args.enable_security_context,
329                    enable_internal_statement_logging: args.enable_internal_statement_logging,
330                    disable_statement_logging: args.disable_statement_logging,
331                    orchestratord_pod_selector_labels: args.orchestratord_pod_selector_labels,
332                    environmentd_node_selector: args.environmentd_node_selector,
333                    environmentd_affinity: args.environmentd_affinity,
334                    environmentd_tolerations: args.environmentd_tolerations,
335                    environmentd_default_resources: args.environmentd_default_resources,
336                    clusterd_node_selector: args.clusterd_node_selector,
337                    clusterd_affinity: args.clusterd_affinity,
338                    clusterd_tolerations: args.clusterd_tolerations,
339                    image_pull_policy: args.image_pull_policy,
340                    network_policies_internal_enabled: args.network_policies_internal_enabled,
341                    network_policies_ingress_enabled: args.network_policies_ingress_enabled,
342                    network_policies_ingress_cidrs: args.network_policies_ingress_cidrs.clone(),
343                    network_policies_egress_enabled: args.network_policies_egress_enabled,
344                    network_policies_egress_cidrs: args.network_policies_egress_cidrs,
345                    environmentd_cluster_replica_sizes: args.environmentd_cluster_replica_sizes,
346                    bootstrap_default_cluster_replica_size: args
347                        .bootstrap_default_cluster_replica_size,
348                    bootstrap_builtin_system_cluster_replica_size: args
349                        .bootstrap_builtin_system_cluster_replica_size,
350                    bootstrap_builtin_probe_cluster_replica_size: args
351                        .bootstrap_builtin_probe_cluster_replica_size,
352                    bootstrap_builtin_support_cluster_replica_size: args
353                        .bootstrap_builtin_support_cluster_replica_size,
354                    bootstrap_builtin_catalog_server_cluster_replica_size: args
355                        .bootstrap_builtin_catalog_server_cluster_replica_size,
356                    bootstrap_builtin_analytics_cluster_replica_size: args
357                        .bootstrap_builtin_analytics_cluster_replica_size,
358                    bootstrap_builtin_system_cluster_replication_factor: args
359                        .bootstrap_builtin_system_cluster_replication_factor,
360                    bootstrap_builtin_probe_cluster_replication_factor: args
361                        .bootstrap_builtin_probe_cluster_replication_factor,
362                    bootstrap_builtin_support_cluster_replication_factor: args
363                        .bootstrap_builtin_support_cluster_replication_factor,
364                    bootstrap_builtin_analytics_cluster_replication_factor: args
365                        .bootstrap_builtin_analytics_cluster_replication_factor,
366                    environmentd_allowed_origins: args.environmentd_allowed_origins,
367                    internal_console_proxy_url: args.internal_console_proxy_url,
368                    environmentd_sql_port: args.environmentd_sql_port,
369                    environmentd_http_port: args.environmentd_http_port,
370                    environmentd_internal_sql_port: args.environmentd_internal_sql_port,
371                    environmentd_internal_http_port: args.environmentd_internal_http_port,
372                    environmentd_internal_persist_pubsub_port: args
373                        .environmentd_internal_persist_pubsub_port,
374                    default_certificate_specs: args.default_certificate_specs.clone(),
375                    disable_license_key_checks: args.disable_license_key_checks,
376                    tracing: args.tracing,
377                    orchestratord_namespace: namespace,
378                },
379                Arc::clone(&metrics),
380                client.clone(),
381            )
382            .await,
383            watcher::Config::default().timeout(29),
384        )
385        .run(),
386    );
387
388    mz_ore::task::spawn(
389        || "balancer controller",
390        k8s_controller::Controller::namespaced_all(
391            client.clone(),
392            controller::balancer::Context::new(
393                controller::balancer::Config {
394                    enable_security_context: args.enable_security_context,
395                    enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
396                    image_pull_policy: args.image_pull_policy,
397                    scheduler_name: args.scheduler_name.clone(),
398                    balancerd_node_selector: args.balancerd_node_selector,
399                    balancerd_affinity: args.balancerd_affinity,
400                    balancerd_tolerations: args.balancerd_tolerations,
401                    balancerd_default_resources: args.balancerd_default_resources,
402                    default_certificate_specs: args.default_certificate_specs.clone(),
403                    environmentd_sql_port: args.environmentd_sql_port,
404                    environmentd_http_port: args.environmentd_http_port,
405                    balancerd_sql_port: args.balancerd_sql_port,
406                    balancerd_http_port: args.balancerd_http_port,
407                    balancerd_internal_http_port: args.balancerd_internal_http_port,
408                },
409                client.clone(),
410            )
411            .await,
412            watcher::Config::default().timeout(29),
413        )
414        .with_controller(|controller| {
415            controller
416                .owns(
417                    Api::<Deployment>::all(client.clone()),
418                    watcher::Config::default()
419                        .labels("materialize.cloud/mz-resource-id")
420                        .timeout(29),
421                )
422                .owns(
423                    Api::<Service>::all(client.clone()),
424                    watcher::Config::default()
425                        .labels("materialize.cloud/mz-resource-id")
426                        .timeout(29),
427                )
428                .owns(
429                    Api::<Certificate>::all(client.clone()),
430                    watcher::Config::default()
431                        .labels("materialize.cloud/mz-resource-id")
432                        .timeout(29),
433                )
434        })
435        .run(),
436    );
437
438    mz_ore::task::spawn(
439        || "console controller",
440        k8s_controller::Controller::namespaced_all(
441            client.clone(),
442            controller::console::Context::new(
443                controller::console::Config {
444                    enable_security_context: args.enable_security_context,
445                    enable_prometheus_scrape_annotations: args.enable_prometheus_scrape_annotations,
446                    image_pull_policy: args.image_pull_policy,
447                    scheduler_name: args.scheduler_name,
448                    console_node_selector: args.console_node_selector,
449                    console_affinity: args.console_affinity,
450                    console_tolerations: args.console_tolerations,
451                    console_default_resources: args.console_default_resources,
452                    network_policies_ingress_enabled: args.network_policies_ingress_enabled,
453                    network_policies_ingress_cidrs: args.network_policies_ingress_cidrs,
454                    default_certificate_specs: args.default_certificate_specs,
455                    console_http_port: args.console_http_port,
456                    balancerd_http_port: args.balancerd_http_port,
457                },
458                client.clone(),
459            )
460            .await,
461            watcher::Config::default().timeout(29),
462        )
463        .with_controller(|controller| {
464            controller
465                .owns(
466                    Api::<Deployment>::all(client.clone()),
467                    watcher::Config::default()
468                        .labels("materialize.cloud/mz-resource-id")
469                        .timeout(29),
470                )
471                .owns(
472                    Api::<Service>::all(client.clone()),
473                    watcher::Config::default()
474                        .labels("materialize.cloud/mz-resource-id")
475                        .timeout(29),
476                )
477                .owns(
478                    Api::<Certificate>::all(client.clone()),
479                    watcher::Config::default()
480                        .labels("materialize.cloud/mz-resource-id")
481                        .timeout(29),
482                )
483                .owns(
484                    Api::<NetworkPolicy>::all(client.clone()),
485                    watcher::Config::default()
486                        .labels("materialize.cloud/mz-resource-id")
487                        .timeout(29),
488                )
489                .owns(
490                    Api::<ConfigMap>::all(client.clone()),
491                    watcher::Config::default()
492                        .labels("materialize.cloud/mz-resource-id")
493                        .timeout(29),
494                )
495        })
496        .run(),
497    );
498
499    info!("All tasks started successfully.");
500
501    future::pending().await
502}