1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13
14use anyhow::Context;
15use axum::http::StatusCode;
16use axum::routing;
17use fail::FailScenario;
18use futures::future;
19use hyper_util::rt::TokioIo;
20use mz_build_info::{BuildInfo, build_info};
21use mz_cloud_resources::AwsExternalIdPrefix;
22use mz_compute::server::ComputeInstanceContext;
23use mz_compute_client::service::proto_compute_server::ProtoComputeServer;
24use mz_http_util::DynamicFilterTarget;
25use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
26use mz_ore::cli::{self, CliConfig};
27use mz_ore::error::ErrorExt;
28use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
29use mz_ore::netio::{Listener, SocketAddr};
30use mz_ore::now::SYSTEM_TIME;
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::PersistConfig;
33use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
34use mz_service::emit_boot_diagnostics;
35use mz_service::grpc::{GrpcServer, GrpcServerMetrics, MAX_GRPC_MESSAGE_SIZE};
36use mz_service::secrets::SecretsReaderCliArgs;
37use mz_storage::storage_state::StorageInstanceContext;
38use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
39use mz_storage_types::connections::ConnectionContext;
40use mz_txn_wal::operator::TxnsContext;
41use tokio::runtime::Handle;
42use tower::Service;
43use tracing::{error, info};
44
45const BUILD_INFO: BuildInfo = build_info!();
46
47pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
48
49#[derive(clap::Parser)]
51#[clap(name = "clusterd", version = VERSION.as_str())]
52struct Args {
53 #[clap(
57 long,
58 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
59 value_name = "HOST:PORT",
60 default_value = "127.0.0.1:2100"
61 )]
62 storage_controller_listen_addr: SocketAddr,
63 #[clap(
66 long,
67 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
68 value_name = "HOST:PORT",
69 default_value = "127.0.0.1:2101"
70 )]
71 compute_controller_listen_addr: SocketAddr,
72 #[clap(
74 long,
75 env = "INTERNAL_HTTP_LISTEN_ADDR",
76 value_name = "HOST:PORT",
77 default_value = "127.0.0.1:6878"
78 )]
79 internal_http_listen_addr: SocketAddr,
80 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
85 grpc_host: Option<String>,
86
87 #[clap(
90 long,
91 env = "PERSIST_PUBSUB_URL",
92 value_name = "http://HOST:PORT",
93 default_value = "http://localhost:6879"
94 )]
95 persist_pubsub_url: String,
96
97 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
102 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
103
104 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
107 aws_connection_role_arn: Option<String>,
108
109 #[clap(flatten)]
111 secrets: SecretsReaderCliArgs,
112
113 #[clap(flatten)]
115 tracing: TracingCliArgs,
116
117 #[clap(long, env = "ENVIRONMENT_ID")]
121 environment_id: String,
122
123 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
125 scratch_directory: Option<PathBuf>,
126
127 #[clap(long)]
129 announce_memory_limit: Option<usize>,
130
131 #[clap(long)]
134 is_cc: bool,
135
136 #[clap(long)]
142 worker_core_affinity: bool,
143}
144
145pub fn main() {
146 mz_ore::panic::install_enhanced_handler();
147
148 let args = cli::parse_args(CliConfig {
149 env_prefix: Some("CLUSTERD_"),
150 enable_version_flag: true,
151 });
152
153 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
154 let runtime = tokio::runtime::Builder::new_multi_thread()
155 .worker_threads(ncpus_useful)
156 .thread_name_fn(|| {
161 use std::sync::atomic::{AtomicUsize, Ordering};
162 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
163 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
164 format!("tokio:work-{}", id)
165 })
166 .enable_all()
167 .build()
168 .unwrap();
169 if let Err(err) = runtime.block_on(run(args)) {
170 panic!("clusterd: fatal: {}", err.display_with_causes());
171 }
172}
173
174async fn run(args: Args) -> Result<(), anyhow::Error> {
175 let metrics_registry = MetricsRegistry::new();
176 let (tracing_handle, _tracing_guard) = args
177 .tracing
178 .configure_tracing(
179 StaticTracingConfig {
180 service_name: "clusterd",
181 build_info: BUILD_INFO,
182 },
183 metrics_registry.clone(),
184 )
185 .await?;
186
187 let tracing_handle = Arc::new(tracing_handle);
188 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
189
190 mz_timely_util::panic::halt_on_timely_communication_panic();
193
194 let _failpoint_scenario = FailScenario::setup();
195
196 emit_boot_diagnostics!(&BUILD_INFO);
197
198 mz_alloc::register_metrics_into(&metrics_registry).await;
199 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
200
201 let secrets_reader = args
202 .secrets
203 .load()
204 .await
205 .context("loading secrets reader")?;
206
207 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
208 let metrics_registry = metrics_registry.clone();
209 tracing::info!(
210 "serving internal HTTP server on {}",
211 args.internal_http_listen_addr
212 );
213 let listener = Listener::bind(args.internal_http_listen_addr).await?;
214 let mut make_service = mz_prof_http::router(&BUILD_INFO)
215 .route(
216 "/api/livez",
217 routing::get(mz_http_util::handle_liveness_check),
218 )
219 .route(
220 "/metrics",
221 routing::get(move || async move {
222 mz_http_util::handle_prometheus(&metrics_registry).await
223 }),
224 )
225 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
226 .route(
227 "/api/opentelemetry/config",
228 routing::put({
229 move |_: axum::Json<DynamicFilterTarget>| async {
230 (
231 StatusCode::BAD_REQUEST,
232 "This endpoint has been replaced. \
233 Use the `opentelemetry_filter` system variable."
234 .to_string(),
235 )
236 }
237 }),
238 )
239 .route(
240 "/api/stderr/config",
241 routing::put({
242 move |_: axum::Json<DynamicFilterTarget>| async {
243 (
244 StatusCode::BAD_REQUEST,
245 "This endpoint has been replaced. \
246 Use the `log_filter` system variable."
247 .to_string(),
248 )
249 }
250 }),
251 )
252 .into_make_service();
253
254 async move {
257 loop {
258 let (conn, remote_addr) = match listener.accept().await {
259 Ok(peer) => peer,
260 Err(error) => {
261 error!("internal_http connection failed: {error:#}");
262 break;
263 }
264 };
265
266 let tower_service = make_service.call(&conn).await.expect("infallible");
267 let hyper_service =
268 hyper::service::service_fn(move |req| tower_service.clone().call(req));
269
270 mz_ore::task::spawn(
271 || format!("clusterd_internal_http_server:{remote_addr}"),
272 async move {
273 if let Err(error) = hyper::server::conn::http1::Builder::new()
274 .serve_connection(TokioIo::new(conn), hyper_service)
275 .await
276 {
277 error!("failed to serve internal_http connection: {error:#}");
278 }
279 },
280 );
281 }
282 }
283 });
284
285 let pubsub_caller_id = std::env::var("HOSTNAME")
286 .ok()
287 .or_else(|| args.tracing.log_prefix.clone())
288 .unwrap_or_default();
289 let mut persist_cfg =
290 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
291 persist_cfg.is_cc_active = args.is_cc;
292 persist_cfg.announce_memory_limit = args.announce_memory_limit;
293 persist_cfg.disable_compaction();
295
296 let persist_clients = Arc::new(PersistClientCache::new(
297 persist_cfg,
298 &metrics_registry,
299 |persist_cfg, metrics| {
300 let cfg = PersistPubSubClientConfig {
301 url: args.persist_pubsub_url,
302 caller_id: pubsub_caller_id,
303 persist_cfg: persist_cfg.clone(),
304 };
305 GrpcPubSubClient::connect(cfg, metrics)
306 },
307 ));
308 let txns_ctx = TxnsContext::default();
309
310 let connection_context = ConnectionContext::from_cli_args(
311 args.environment_id,
312 &args.tracing.startup_log_filter,
313 args.aws_external_id_prefix,
314 args.aws_connection_role_arn,
315 secrets_reader,
316 None,
317 );
318
319 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
320 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
321
322 let storage_client_builder = mz_storage::serve(
324 &metrics_registry,
325 Arc::clone(&persist_clients),
326 txns_ctx.clone(),
327 Arc::clone(&tracing_handle),
328 SYSTEM_TIME.clone(),
329 connection_context.clone(),
330 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
331 )?;
332 info!(
333 "listening for storage controller connections on {}",
334 args.storage_controller_listen_addr
335 );
336 mz_ore::task::spawn(
337 || "storage_server",
338 GrpcServer::serve(
339 &grpc_server_metrics,
340 args.storage_controller_listen_addr,
341 BUILD_INFO.semver_version(),
342 grpc_host.clone(),
343 storage_client_builder,
344 |svc| ProtoStorageServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
345 ),
346 );
347
348 let compute_client_builder = mz_compute::server::serve(
350 &metrics_registry,
351 persist_clients,
352 txns_ctx,
353 tracing_handle,
354 ComputeInstanceContext {
355 scratch_directory: args.scratch_directory,
356 worker_core_affinity: args.worker_core_affinity,
357 connection_context,
358 },
359 )?;
360 info!(
361 "listening for compute controller connections on {}",
362 args.compute_controller_listen_addr
363 );
364 mz_ore::task::spawn(
365 || "compute_server",
366 GrpcServer::serve(
367 &grpc_server_metrics,
368 args.compute_controller_listen_addr,
369 BUILD_INFO.semver_version(),
370 grpc_host,
371 compute_client_builder,
372 |svc| ProtoComputeServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
373 ),
374 );
375
376 future::pending().await
380}