1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{error, info};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56 #[clap(
60 long,
61 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62 value_name = "HOST:PORT",
63 default_value = "127.0.0.1:2100"
64 )]
65 storage_controller_listen_addr: SocketAddr,
66 #[clap(
69 long,
70 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71 value_name = "HOST:PORT",
72 default_value = "127.0.0.1:2101"
73 )]
74 compute_controller_listen_addr: SocketAddr,
75 #[clap(
77 long,
78 env = "INTERNAL_HTTP_LISTEN_ADDR",
79 value_name = "HOST:PORT",
80 default_value = "127.0.0.1:6878"
81 )]
82 internal_http_listen_addr: SocketAddr,
83 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88 grpc_host: Option<String>,
89
90 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93 storage_timely_config: TimelyConfig,
94 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96 compute_timely_config: TimelyConfig,
97 #[clap(long, env = "PROCESS")]
99 process: usize,
100
101 #[clap(
104 long,
105 env = "PERSIST_PUBSUB_URL",
106 value_name = "http://HOST:PORT",
107 default_value = "http://localhost:6879"
108 )]
109 persist_pubsub_url: String,
110
111 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121 aws_connection_role_arn: Option<String>,
122
123 #[clap(flatten)]
125 secrets: SecretsReaderCliArgs,
126
127 #[clap(flatten)]
129 tracing: TracingCliArgs,
130
131 #[clap(long, env = "ENVIRONMENT_ID")]
135 environment_id: String,
136
137 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139 scratch_directory: Option<PathBuf>,
140
141 #[clap(long)]
146 announce_memory_limit: Option<usize>,
147
148 #[clap(long)]
155 heap_limit: Option<usize>,
156
157 #[clap(long)]
160 is_cc: bool,
161
162 #[clap(long)]
168 worker_core_affinity: bool,
169}
170
171pub fn main() {
172 mz_ore::panic::install_enhanced_handler();
173
174 let args = cli::parse_args(CliConfig {
175 env_prefix: Some("CLUSTERD_"),
176 enable_version_flag: true,
177 });
178
179 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180 let runtime = tokio::runtime::Builder::new_multi_thread()
181 .worker_threads(ncpus_useful)
182 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
188 use std::sync::atomic::{AtomicUsize, Ordering};
189 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
190 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
191 format!("tokio:work-{}", id)
192 })
193 .enable_all()
194 .build()
195 .unwrap();
196 if let Err(err) = runtime.block_on(run(args)) {
197 panic!("clusterd: fatal: {}", err.display_with_causes());
198 }
199}
200
201async fn run(args: Args) -> Result<(), anyhow::Error> {
202 let metrics_registry = MetricsRegistry::new();
203 let (tracing_handle, _tracing_guard) = args
204 .tracing
205 .configure_tracing(
206 StaticTracingConfig {
207 service_name: "clusterd",
208 build_info: BUILD_INFO,
209 },
210 metrics_registry.clone(),
211 )
212 .await?;
213
214 let tracing_handle = Arc::new(tracing_handle);
215 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
216
217 mz_timely_util::panic::halt_on_timely_communication_panic();
220
221 let _failpoint_scenario = FailScenario::setup();
222
223 emit_boot_diagnostics!(&BUILD_INFO);
224
225 mz_alloc::register_metrics_into(&metrics_registry).await;
226 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
227
228 if let Some(heap_limit) = args.heap_limit {
229 mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
230 } else {
231 info!("no heap limit announced; disabling memory limiter");
232 }
233
234 let secrets_reader = args
235 .secrets
236 .load()
237 .await
238 .context("loading secrets reader")?;
239
240 let usage_collector = Arc::new(usage_metrics::Collector {
241 disk_root: args.scratch_directory.clone(),
242 });
243
244 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
245 let metrics_registry = metrics_registry.clone();
246 tracing::info!(
247 "serving internal HTTP server on {}",
248 args.internal_http_listen_addr
249 );
250 let listener = Listener::bind(args.internal_http_listen_addr).await?;
251 let mut make_service = mz_prof_http::router(&BUILD_INFO)
252 .route(
253 "/api/livez",
254 routing::get(mz_http_util::handle_liveness_check),
255 )
256 .route(
257 "/metrics",
258 routing::get(move || async move {
259 mz_http_util::handle_prometheus(&metrics_registry).await
260 }),
261 )
262 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
263 .route(
264 "/api/opentelemetry/config",
265 routing::put({
266 move |_: axum::Json<DynamicFilterTarget>| async {
267 (
268 StatusCode::BAD_REQUEST,
269 "This endpoint has been replaced. \
270 Use the `opentelemetry_filter` system variable."
271 .to_string(),
272 )
273 }
274 }),
275 )
276 .route(
277 "/api/stderr/config",
278 routing::put({
279 move |_: axum::Json<DynamicFilterTarget>| async {
280 (
281 StatusCode::BAD_REQUEST,
282 "This endpoint has been replaced. \
283 Use the `log_filter` system variable."
284 .to_string(),
285 )
286 }
287 }),
288 )
289 .route(
290 "/api/usage-metrics",
291 routing::get(async move || axum::Json(usage_collector.collect())),
292 )
293 .into_make_service();
294
295 async move {
298 loop {
299 let (conn, remote_addr) = match listener.accept().await {
300 Ok(peer) => peer,
301 Err(error) => {
302 error!("internal_http connection failed: {error:#}");
303 break;
304 }
305 };
306
307 let tower_service = make_service.call(&conn).await.expect("infallible");
308 let hyper_service =
309 hyper::service::service_fn(move |req| tower_service.clone().call(req));
310
311 mz_ore::task::spawn(
312 || format!("clusterd_internal_http_server:{remote_addr}"),
313 async move {
314 if let Err(error) = hyper::server::conn::http1::Builder::new()
315 .serve_connection(TokioIo::new(conn), hyper_service)
316 .await
317 {
318 error!("failed to serve internal_http connection: {error:#}");
319 }
320 },
321 );
322 }
323 }
324 });
325
326 let pubsub_caller_id = std::env::var("HOSTNAME")
327 .ok()
328 .or_else(|| args.tracing.log_prefix.clone())
329 .unwrap_or_default();
330 let mut persist_cfg =
331 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
332 persist_cfg.is_cc_active = args.is_cc;
333 persist_cfg.announce_memory_limit = args.announce_memory_limit;
334 persist_cfg.disable_compaction();
336
337 let persist_clients = Arc::new(PersistClientCache::new(
338 persist_cfg,
339 &metrics_registry,
340 |persist_cfg, metrics| {
341 let cfg = PersistPubSubClientConfig {
342 url: args.persist_pubsub_url,
343 caller_id: pubsub_caller_id,
344 persist_cfg: persist_cfg.clone(),
345 };
346 GrpcPubSubClient::connect(cfg, metrics)
347 },
348 ));
349 let txns_ctx = TxnsContext::default();
350
351 let connection_context = ConnectionContext::from_cli_args(
352 args.environment_id,
353 &args.tracing.startup_log_filter,
354 args.aws_external_id_prefix,
355 args.aws_connection_role_arn,
356 secrets_reader,
357 None,
358 );
359
360 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
361 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
362
363 let mut storage_timely_config = args.storage_timely_config;
364 storage_timely_config.process = args.process;
365 let mut compute_timely_config = args.compute_timely_config;
366 compute_timely_config.process = args.process;
367
368 let storage_client_builder = mz_storage::serve(
370 storage_timely_config,
371 &metrics_registry,
372 Arc::clone(&persist_clients),
373 txns_ctx.clone(),
374 Arc::clone(&tracing_handle),
375 SYSTEM_TIME.clone(),
376 connection_context.clone(),
377 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
378 )
379 .await?;
380 info!(
381 "listening for storage controller connections on {}",
382 args.storage_controller_listen_addr
383 );
384 mz_ore::task::spawn(
385 || "storage_server",
386 transport::serve(
387 args.storage_controller_listen_addr,
388 BUILD_INFO.semver_version(),
389 grpc_host.clone(),
390 Duration::MAX,
391 storage_client_builder,
392 grpc_server_metrics.for_server("storage"),
393 ),
394 );
395
396 let compute_client_builder = mz_compute::server::serve(
398 compute_timely_config,
399 &metrics_registry,
400 persist_clients,
401 txns_ctx,
402 tracing_handle,
403 ComputeInstanceContext {
404 scratch_directory: args.scratch_directory,
405 worker_core_affinity: args.worker_core_affinity,
406 connection_context,
407 },
408 )
409 .await?;
410 info!(
411 "listening for compute controller connections on {}",
412 args.compute_controller_listen_addr
413 );
414 mz_ore::task::spawn(
415 || "compute_server",
416 transport::serve(
417 args.compute_controller_listen_addr,
418 BUILD_INFO.semver_version(),
419 grpc_host.clone(),
420 Duration::MAX,
421 compute_client_builder,
422 grpc_server_metrics.for_server("compute"),
423 ),
424 );
425
426 future::pending().await
430}