1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{error, info};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56 #[clap(
60 long,
61 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62 value_name = "HOST:PORT",
63 default_value = "127.0.0.1:2100"
64 )]
65 storage_controller_listen_addr: SocketAddr,
66 #[clap(
69 long,
70 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71 value_name = "HOST:PORT",
72 default_value = "127.0.0.1:2101"
73 )]
74 compute_controller_listen_addr: SocketAddr,
75 #[clap(
77 long,
78 env = "INTERNAL_HTTP_LISTEN_ADDR",
79 value_name = "HOST:PORT",
80 default_value = "127.0.0.1:6878"
81 )]
82 internal_http_listen_addr: SocketAddr,
83 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88 grpc_host: Option<String>,
89
90 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93 storage_timely_config: TimelyConfig,
94 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96 compute_timely_config: TimelyConfig,
97 #[clap(long, env = "PROCESS")]
99 process: usize,
100
101 #[clap(
104 long,
105 env = "PERSIST_PUBSUB_URL",
106 value_name = "http://HOST:PORT",
107 default_value = "http://localhost:6879"
108 )]
109 persist_pubsub_url: String,
110
111 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121 aws_connection_role_arn: Option<String>,
122
123 #[clap(flatten)]
125 secrets: SecretsReaderCliArgs,
126
127 #[clap(flatten)]
129 tracing: TracingCliArgs,
130
131 #[clap(long, env = "ENVIRONMENT_ID")]
135 environment_id: String,
136
137 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139 scratch_directory: Option<PathBuf>,
140
141 #[clap(long)]
146 announce_memory_limit: Option<usize>,
147
148 #[clap(long)]
155 heap_limit: Option<usize>,
156
157 #[clap(long)]
160 is_cc: bool,
161
162 #[clap(long)]
168 worker_core_affinity: bool,
169}
170
171pub fn main() {
172 mz_ore::panic::install_enhanced_handler();
173
174 let args = cli::parse_args(CliConfig {
175 env_prefix: Some("CLUSTERD_"),
176 enable_version_flag: true,
177 });
178
179 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180 let runtime = tokio::runtime::Builder::new_multi_thread()
181 .worker_threads(ncpus_useful)
182 .thread_name_fn(|| {
187 use std::sync::atomic::{AtomicUsize, Ordering};
188 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
189 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
190 format!("tokio:work-{}", id)
191 })
192 .enable_all()
193 .build()
194 .unwrap();
195 if let Err(err) = runtime.block_on(run(args)) {
196 panic!("clusterd: fatal: {}", err.display_with_causes());
197 }
198}
199
200async fn run(args: Args) -> Result<(), anyhow::Error> {
201 let metrics_registry = MetricsRegistry::new();
202 let (tracing_handle, _tracing_guard) = args
203 .tracing
204 .configure_tracing(
205 StaticTracingConfig {
206 service_name: "clusterd",
207 build_info: BUILD_INFO,
208 },
209 metrics_registry.clone(),
210 )
211 .await?;
212
213 let tracing_handle = Arc::new(tracing_handle);
214 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
215
216 mz_timely_util::panic::halt_on_timely_communication_panic();
219
220 let _failpoint_scenario = FailScenario::setup();
221
222 emit_boot_diagnostics!(&BUILD_INFO);
223
224 mz_alloc::register_metrics_into(&metrics_registry).await;
225 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
226
227 if let Some(heap_limit) = args.heap_limit {
228 mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
229 } else {
230 info!("no heap limit announced; disabling memory limiter");
231 }
232
233 let secrets_reader = args
234 .secrets
235 .load()
236 .await
237 .context("loading secrets reader")?;
238
239 let usage_collector = Arc::new(usage_metrics::Collector {
240 disk_root: args.scratch_directory.clone(),
241 });
242
243 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
244 let metrics_registry = metrics_registry.clone();
245 tracing::info!(
246 "serving internal HTTP server on {}",
247 args.internal_http_listen_addr
248 );
249 let listener = Listener::bind(args.internal_http_listen_addr).await?;
250 let mut make_service = mz_prof_http::router(&BUILD_INFO)
251 .route(
252 "/api/livez",
253 routing::get(mz_http_util::handle_liveness_check),
254 )
255 .route(
256 "/metrics",
257 routing::get(move || async move {
258 mz_http_util::handle_prometheus(&metrics_registry).await
259 }),
260 )
261 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
262 .route(
263 "/api/opentelemetry/config",
264 routing::put({
265 move |_: axum::Json<DynamicFilterTarget>| async {
266 (
267 StatusCode::BAD_REQUEST,
268 "This endpoint has been replaced. \
269 Use the `opentelemetry_filter` system variable."
270 .to_string(),
271 )
272 }
273 }),
274 )
275 .route(
276 "/api/stderr/config",
277 routing::put({
278 move |_: axum::Json<DynamicFilterTarget>| async {
279 (
280 StatusCode::BAD_REQUEST,
281 "This endpoint has been replaced. \
282 Use the `log_filter` system variable."
283 .to_string(),
284 )
285 }
286 }),
287 )
288 .route(
289 "/api/usage-metrics",
290 routing::get(async move || axum::Json(usage_collector.collect())),
291 )
292 .into_make_service();
293
294 async move {
297 loop {
298 let (conn, remote_addr) = match listener.accept().await {
299 Ok(peer) => peer,
300 Err(error) => {
301 error!("internal_http connection failed: {error:#}");
302 break;
303 }
304 };
305
306 let tower_service = make_service.call(&conn).await.expect("infallible");
307 let hyper_service =
308 hyper::service::service_fn(move |req| tower_service.clone().call(req));
309
310 mz_ore::task::spawn(
311 || format!("clusterd_internal_http_server:{remote_addr}"),
312 async move {
313 if let Err(error) = hyper::server::conn::http1::Builder::new()
314 .serve_connection(TokioIo::new(conn), hyper_service)
315 .await
316 {
317 error!("failed to serve internal_http connection: {error:#}");
318 }
319 },
320 );
321 }
322 }
323 });
324
325 let pubsub_caller_id = std::env::var("HOSTNAME")
326 .ok()
327 .or_else(|| args.tracing.log_prefix.clone())
328 .unwrap_or_default();
329 let mut persist_cfg =
330 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
331 persist_cfg.is_cc_active = args.is_cc;
332 persist_cfg.announce_memory_limit = args.announce_memory_limit;
333 persist_cfg.disable_compaction();
335
336 let persist_clients = Arc::new(PersistClientCache::new(
337 persist_cfg,
338 &metrics_registry,
339 |persist_cfg, metrics| {
340 let cfg = PersistPubSubClientConfig {
341 url: args.persist_pubsub_url,
342 caller_id: pubsub_caller_id,
343 persist_cfg: persist_cfg.clone(),
344 };
345 GrpcPubSubClient::connect(cfg, metrics)
346 },
347 ));
348 let txns_ctx = TxnsContext::default();
349
350 let connection_context = ConnectionContext::from_cli_args(
351 args.environment_id,
352 &args.tracing.startup_log_filter,
353 args.aws_external_id_prefix,
354 args.aws_connection_role_arn,
355 secrets_reader,
356 None,
357 );
358
359 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
360 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
361
362 let mut storage_timely_config = args.storage_timely_config;
363 storage_timely_config.process = args.process;
364 let mut compute_timely_config = args.compute_timely_config;
365 compute_timely_config.process = args.process;
366
367 let storage_client_builder = mz_storage::serve(
369 storage_timely_config,
370 &metrics_registry,
371 Arc::clone(&persist_clients),
372 txns_ctx.clone(),
373 Arc::clone(&tracing_handle),
374 SYSTEM_TIME.clone(),
375 connection_context.clone(),
376 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
377 )
378 .await?;
379 info!(
380 "listening for storage controller connections on {}",
381 args.storage_controller_listen_addr
382 );
383 mz_ore::task::spawn(
384 || "storage_server",
385 transport::serve(
386 args.storage_controller_listen_addr,
387 BUILD_INFO.semver_version(),
388 grpc_host.clone(),
389 Duration::MAX,
390 storage_client_builder,
391 grpc_server_metrics.for_server("storage"),
392 ),
393 );
394
395 let compute_client_builder = mz_compute::server::serve(
397 compute_timely_config,
398 &metrics_registry,
399 persist_clients,
400 txns_ctx,
401 tracing_handle,
402 ComputeInstanceContext {
403 scratch_directory: args.scratch_directory,
404 worker_core_affinity: args.worker_core_affinity,
405 connection_context,
406 },
407 )
408 .await?;
409 info!(
410 "listening for compute controller connections on {}",
411 args.compute_controller_listen_addr
412 );
413 mz_ore::task::spawn(
414 || "compute_server",
415 transport::serve(
416 args.compute_controller_listen_addr,
417 BUILD_INFO.semver_version(),
418 grpc_host.clone(),
419 Duration::MAX,
420 compute_client_builder,
421 grpc_server_metrics.for_server("compute"),
422 ),
423 );
424
425 future::pending().await
429}