orchestratord/
orchestratord.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{
11    future,
12    net::SocketAddr,
13    sync::{Arc, LazyLock},
14};
15
16use kube::runtime::watcher;
17use tracing::info;
18
19use mz_build_info::{BuildInfo, build_info};
20use mz_orchestrator_kubernetes::util::create_client;
21use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
22use mz_orchestratord::{
23    controller,
24    k8s::register_crds,
25    metrics::{self, Metrics},
26};
27use mz_ore::{
28    cli::{self, CliConfig},
29    error::ErrorExt,
30    metrics::MetricsRegistry,
31};
32
33const BUILD_INFO: BuildInfo = build_info!();
34static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
35
36#[derive(clap::Parser)]
37#[clap(name = "orchestratord", version = VERSION.as_str())]
38pub struct Args {
39    #[structopt(long, env = "KUBERNETES_CONTEXT", default_value = "minikube")]
40    kubernetes_context: String,
41
42    #[clap(long, default_value = "[::]:8004")]
43    profiling_listen_address: SocketAddr,
44    #[clap(long, default_value = "[::]:3100")]
45    metrics_listen_address: SocketAddr,
46
47    #[clap(flatten)]
48    materialize_controller_args: controller::materialize::MaterializeControllerArgs,
49
50    #[clap(flatten)]
51    tracing: TracingCliArgs,
52}
53
54#[tokio::main]
55async fn main() {
56    mz_ore::panic::install_enhanced_handler();
57
58    let args = cli::parse_args(CliConfig {
59        env_prefix: Some("ORCHESTRATORD_"),
60        enable_version_flag: true,
61    });
62    if let Err(err) = run(args).await {
63        panic!("orchestratord: fatal: {}", err.display_with_causes());
64    }
65}
66
67async fn run(args: Args) -> Result<(), anyhow::Error> {
68    let metrics_registry = MetricsRegistry::new();
69    let (_, _tracing_guard) = args
70        .tracing
71        .configure_tracing(
72            StaticTracingConfig {
73                service_name: "orchestratord",
74                build_info: BUILD_INFO,
75            },
76            metrics_registry.clone(),
77        )
78        .await?;
79
80    let metrics = Arc::new(Metrics::register_into(&metrics_registry));
81
82    let (client, namespace) = create_client(args.kubernetes_context.clone()).await?;
83    register_crds(client.clone()).await?;
84
85    {
86        let router = mz_prof_http::router(&BUILD_INFO);
87        let address = args.profiling_listen_address.clone();
88        mz_ore::task::spawn(|| "profiling API internal web server", async move {
89            if let Err(e) = axum::serve(
90                tokio::net::TcpListener::bind(&address).await.unwrap(),
91                router.into_make_service(),
92            )
93            .await
94            {
95                panic!(
96                    "profiling API internal web server failed: {}",
97                    e.display_with_causes()
98                );
99            }
100        });
101    }
102
103    {
104        let router = metrics::router(metrics_registry.clone());
105        let address = args.metrics_listen_address.clone();
106        mz_ore::task::spawn(|| "metrics server", async move {
107            if let Err(e) = axum::serve(
108                tokio::net::TcpListener::bind(&address).await.unwrap(),
109                router.into_make_service(),
110            )
111            .await
112            {
113                panic!("metrics server failed: {}", e.display_with_causes());
114            }
115        });
116    }
117
118    mz_ore::task::spawn(
119        || "materialize controller",
120        k8s_controller::Controller::namespaced_all(
121            client.clone(),
122            mz_orchestratord::controller::materialize::Context::new(
123                args.materialize_controller_args,
124                args.tracing,
125                namespace,
126                Arc::clone(&metrics),
127            ),
128            watcher::Config::default().timeout(29),
129        )
130        .run(),
131    );
132
133    info!("All tasks started successfully.");
134
135    future::pending().await
136}