1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{Instrument, debug, error, info, info_span};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56 #[clap(
60 long,
61 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62 value_name = "HOST:PORT",
63 default_value = "127.0.0.1:2100"
64 )]
65 storage_controller_listen_addr: SocketAddr,
66 #[clap(
69 long,
70 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71 value_name = "HOST:PORT",
72 default_value = "127.0.0.1:2101"
73 )]
74 compute_controller_listen_addr: SocketAddr,
75 #[clap(
77 long,
78 env = "INTERNAL_HTTP_LISTEN_ADDR",
79 value_name = "HOST:PORT",
80 default_value = "127.0.0.1:6878"
81 )]
82 internal_http_listen_addr: SocketAddr,
83 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88 grpc_host: Option<String>,
89
90 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93 storage_timely_config: TimelyConfig,
94 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96 compute_timely_config: TimelyConfig,
97 #[clap(long, env = "PROCESS")]
99 process: usize,
100
101 #[clap(
104 long,
105 env = "PERSIST_PUBSUB_URL",
106 value_name = "http://HOST:PORT",
107 default_value = "http://localhost:6879"
108 )]
109 persist_pubsub_url: String,
110
111 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121 aws_connection_role_arn: Option<String>,
122
123 #[clap(flatten)]
125 secrets: SecretsReaderCliArgs,
126
127 #[clap(flatten)]
129 tracing: TracingCliArgs,
130
131 #[clap(long, env = "ENVIRONMENT_ID")]
135 environment_id: String,
136
137 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139 scratch_directory: Option<PathBuf>,
140
141 #[clap(long)]
146 announce_memory_limit: Option<usize>,
147
148 #[clap(long)]
155 heap_limit: Option<usize>,
156
157 #[clap(long)]
160 is_cc: bool,
161
162 #[clap(long)]
168 worker_core_affinity: bool,
169}
170
171pub fn main() {
172 mz_ore::panic::install_enhanced_handler();
173
174 let args = cli::parse_args(CliConfig {
175 env_prefix: Some("CLUSTERD_"),
176 enable_version_flag: true,
177 });
178
179 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180 let runtime = tokio::runtime::Builder::new_multi_thread()
181 .worker_threads(ncpus_useful)
182 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
188 use std::sync::atomic::{AtomicUsize, Ordering};
189 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
190 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
191 format!("tokio:work-{}", id)
192 })
193 .enable_all()
194 .build()
195 .unwrap();
196 if let Err(err) = runtime.block_on(run(args)) {
197 panic!("clusterd: fatal: {}", err.display_with_causes());
198 }
199}
200
201async fn run(args: Args) -> Result<(), anyhow::Error> {
202 let metrics_registry = MetricsRegistry::new();
203 let tracing_handle = args
204 .tracing
205 .configure_tracing(
206 StaticTracingConfig {
207 service_name: "clusterd",
208 build_info: BUILD_INFO,
209 },
210 metrics_registry.clone(),
211 )
212 .await?;
213
214 let tracing_handle = Arc::new(tracing_handle);
215 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
216
217 mz_timely_util::panic::halt_on_timely_communication_panic();
220
221 let _failpoint_scenario = FailScenario::setup();
222
223 emit_boot_diagnostics!(&BUILD_INFO);
224
225 mz_alloc::register_metrics_into(&metrics_registry).await;
226 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
227
228 if let Some(heap_limit) = args.heap_limit {
229 mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
230 } else {
231 info!("no heap limit announced; disabling memory limiter");
232 }
233
234 let secrets_reader = args
235 .secrets
236 .load()
237 .await
238 .context("loading secrets reader")?;
239
240 let usage_collector = Arc::new(usage_metrics::Collector {
241 disk_root: args.scratch_directory.clone(),
242 });
243
244 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
245 let metrics_registry = metrics_registry.clone();
246 tracing::info!(
247 "serving internal HTTP server on {}",
248 args.internal_http_listen_addr
249 );
250 let listener = Listener::bind(args.internal_http_listen_addr).await?;
251 let mut make_service = mz_prof_http::router(&BUILD_INFO)
252 .route(
253 "/api/livez",
254 routing::get(mz_http_util::handle_liveness_check),
255 )
256 .route(
257 "/metrics",
258 routing::get(move || async move {
259 mz_http_util::handle_prometheus(&metrics_registry).await
260 }),
261 )
262 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
263 .route(
264 "/api/opentelemetry/config",
265 routing::put({
266 move |_: axum::Json<DynamicFilterTarget>| async {
267 (
268 StatusCode::BAD_REQUEST,
269 "This endpoint has been replaced. \
270 Use the `opentelemetry_filter` system variable."
271 .to_string(),
272 )
273 }
274 }),
275 )
276 .route(
277 "/api/stderr/config",
278 routing::put({
279 move |_: axum::Json<DynamicFilterTarget>| async {
280 (
281 StatusCode::BAD_REQUEST,
282 "This endpoint has been replaced. \
283 Use the `log_filter` system variable."
284 .to_string(),
285 )
286 }
287 }),
288 )
289 .route(
290 "/api/usage-metrics",
291 routing::get(async move || axum::Json(usage_collector.collect())),
292 )
293 .into_make_service();
294
295 async move {
298 loop {
299 let (conn, remote_addr) = match listener.accept().await {
300 Ok(peer) => peer,
301 Err(error) => {
302 if is_connection_error(&error) {
307 debug!("accepted connection already errored: {error:#}");
308 } else {
309 error!("internal_http accept error: {error:#}");
310 tokio::time::sleep(Duration::from_secs(1)).await;
311 }
312 continue;
313 }
314 };
315
316 let tower_service = make_service.call(&conn).await.expect("infallible");
317 let hyper_service =
318 hyper::service::service_fn(move |req| tower_service.clone().call(req));
319
320 mz_ore::task::spawn(
321 || format!("clusterd_internal_http_server:{remote_addr}"),
322 async move {
323 if let Err(error) = hyper::server::conn::http1::Builder::new()
324 .serve_connection(TokioIo::new(conn), hyper_service)
325 .await
326 {
327 info!("error serving internal_http connection: {error:#}");
331 }
332 },
333 );
334 }
335 }
336 });
337
338 let pubsub_caller_id = std::env::var("HOSTNAME")
339 .ok()
340 .or_else(|| args.tracing.log_prefix.clone())
341 .unwrap_or_default();
342 let mut persist_cfg =
343 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
344 persist_cfg.is_cc_active = args.is_cc;
345 persist_cfg.announce_memory_limit = args.announce_memory_limit;
346 persist_cfg.disable_compaction();
348
349 let persist_clients = Arc::new(PersistClientCache::new(
350 persist_cfg,
351 &metrics_registry,
352 |persist_cfg, metrics| {
353 let cfg = PersistPubSubClientConfig {
354 url: args.persist_pubsub_url,
355 caller_id: pubsub_caller_id,
356 persist_cfg: persist_cfg.clone(),
357 };
358 GrpcPubSubClient::connect(cfg, metrics)
359 },
360 ));
361 let txns_ctx = TxnsContext::default();
362
363 let connection_context = ConnectionContext::from_cli_args(
364 args.environment_id,
365 &args.tracing.startup_log_filter,
366 args.aws_external_id_prefix,
367 args.aws_connection_role_arn,
368 secrets_reader,
369 None,
370 );
371
372 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
373 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
374
375 let mut storage_timely_config = args.storage_timely_config;
376 storage_timely_config.process = args.process;
377 let mut compute_timely_config = args.compute_timely_config;
378 compute_timely_config.process = args.process;
379
380 let storage_client_builder = mz_storage::serve(
382 storage_timely_config,
383 &metrics_registry,
384 Arc::clone(&persist_clients),
385 txns_ctx.clone(),
386 Arc::clone(&tracing_handle),
387 SYSTEM_TIME.clone(),
388 connection_context.clone(),
389 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
390 )
391 .await?;
392 info!(
393 "listening for storage controller connections on {}",
394 args.storage_controller_listen_addr
395 );
396 mz_ore::task::spawn(
397 || "storage_server",
398 transport::serve(
399 args.storage_controller_listen_addr,
400 BUILD_INFO.semver_version(),
401 grpc_host.clone(),
402 Duration::MAX,
403 storage_client_builder,
404 grpc_server_metrics.for_server("storage"),
405 )
406 .instrument(info_span!("ctp", name = "storage")),
407 );
408
409 let compute_client_builder = mz_compute::server::serve(
411 compute_timely_config,
412 &metrics_registry,
413 persist_clients,
414 txns_ctx,
415 tracing_handle,
416 ComputeInstanceContext {
417 scratch_directory: args.scratch_directory,
418 worker_core_affinity: args.worker_core_affinity,
419 connection_context,
420 },
421 )
422 .await?;
423 info!(
424 "listening for compute controller connections on {}",
425 args.compute_controller_listen_addr
426 );
427 mz_ore::task::spawn(
428 || "compute_server",
429 transport::serve(
430 args.compute_controller_listen_addr,
431 BUILD_INFO.semver_version(),
432 grpc_host.clone(),
433 Duration::MAX,
434 compute_client_builder,
435 grpc_server_metrics.for_server("compute"),
436 )
437 .instrument(info_span!("ctp", name = "compute")),
438 );
439
440 future::pending().await
444}
445
446fn is_connection_error(e: &std::io::Error) -> bool {
450 matches!(
451 e.kind(),
452 std::io::ErrorKind::ConnectionRefused
453 | std::io::ErrorKind::ConnectionAborted
454 | std::io::ErrorKind::ConnectionReset
455 )
456}