orchestratord/
orchestratord.rs
1use std::{
11 future,
12 net::SocketAddr,
13 sync::{Arc, LazyLock},
14};
15
16use kube::runtime::watcher;
17use tracing::info;
18
19use mz_build_info::{BuildInfo, build_info};
20use mz_orchestrator_kubernetes::util::create_client;
21use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
22use mz_orchestratord::{
23 controller,
24 k8s::register_crds,
25 metrics::{self, Metrics},
26};
27use mz_ore::{
28 cli::{self, CliConfig},
29 error::ErrorExt,
30 metrics::MetricsRegistry,
31};
32
33const BUILD_INFO: BuildInfo = build_info!();
34static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
35
36#[derive(clap::Parser)]
37#[clap(name = "orchestratord", version = VERSION.as_str())]
38pub struct Args {
39 #[structopt(long, env = "KUBERNETES_CONTEXT", default_value = "minikube")]
40 kubernetes_context: String,
41
42 #[clap(long, default_value = "[::]:8004")]
43 profiling_listen_address: SocketAddr,
44 #[clap(long, default_value = "[::]:3100")]
45 metrics_listen_address: SocketAddr,
46
47 #[clap(flatten)]
48 materialize_controller_args: controller::materialize::MaterializeControllerArgs,
49
50 #[clap(flatten)]
51 tracing: TracingCliArgs,
52}
53
54#[tokio::main]
55async fn main() {
56 mz_ore::panic::install_enhanced_handler();
57
58 let args = cli::parse_args(CliConfig {
59 env_prefix: Some("ORCHESTRATORD_"),
60 enable_version_flag: true,
61 });
62 if let Err(err) = run(args).await {
63 panic!("orchestratord: fatal: {}", err.display_with_causes());
64 }
65}
66
67async fn run(args: Args) -> Result<(), anyhow::Error> {
68 let metrics_registry = MetricsRegistry::new();
69 let (_, _tracing_guard) = args
70 .tracing
71 .configure_tracing(
72 StaticTracingConfig {
73 service_name: "orchestratord",
74 build_info: BUILD_INFO,
75 },
76 metrics_registry.clone(),
77 )
78 .await?;
79
80 let metrics = Arc::new(Metrics::register_into(&metrics_registry));
81
82 let (client, namespace) = create_client(args.kubernetes_context.clone()).await?;
83 register_crds(client.clone()).await?;
84
85 {
86 let router = mz_prof_http::router(&BUILD_INFO);
87 let address = args.profiling_listen_address.clone();
88 mz_ore::task::spawn(|| "profiling API internal web server", async move {
89 if let Err(e) = axum::serve(
90 tokio::net::TcpListener::bind(&address).await.unwrap(),
91 router.into_make_service(),
92 )
93 .await
94 {
95 panic!(
96 "profiling API internal web server failed: {}",
97 e.display_with_causes()
98 );
99 }
100 });
101 }
102
103 {
104 let router = metrics::router(metrics_registry.clone());
105 let address = args.metrics_listen_address.clone();
106 mz_ore::task::spawn(|| "metrics server", async move {
107 if let Err(e) = axum::serve(
108 tokio::net::TcpListener::bind(&address).await.unwrap(),
109 router.into_make_service(),
110 )
111 .await
112 {
113 panic!("metrics server failed: {}", e.display_with_causes());
114 }
115 });
116 }
117
118 mz_ore::task::spawn(
119 || "materialize controller",
120 k8s_controller::Controller::namespaced_all(
121 client.clone(),
122 mz_orchestratord::controller::materialize::Context::new(
123 args.materialize_controller_args,
124 args.tracing,
125 namespace,
126 Arc::clone(&metrics),
127 ),
128 watcher::Config::default().timeout(29),
129 )
130 .run(),
131 );
132
133 info!("All tasks started successfully.");
134
135 future::pending().await
136}