1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_timely_util::capture::arc_event_link;
42use mz_txn_wal::operator::TxnsContext;
43use tokio::runtime::Handle;
44use tower::Service;
45use tracing::{Instrument, debug, error, info, info_span};
46
47mod usage_metrics;
48
49const BUILD_INFO: BuildInfo = build_info!();
50
51pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
52
53#[derive(clap::Parser)]
55#[clap(name = "clusterd", version = VERSION.as_str())]
56struct Args {
57 #[clap(
61 long,
62 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
63 value_name = "HOST:PORT",
64 default_value = "127.0.0.1:2100"
65 )]
66 storage_controller_listen_addr: SocketAddr,
67 #[clap(
70 long,
71 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
72 value_name = "HOST:PORT",
73 default_value = "127.0.0.1:2101"
74 )]
75 compute_controller_listen_addr: SocketAddr,
76 #[clap(
78 long,
79 env = "INTERNAL_HTTP_LISTEN_ADDR",
80 value_name = "HOST:PORT",
81 default_value = "127.0.0.1:6878"
82 )]
83 internal_http_listen_addr: SocketAddr,
84 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
89 grpc_host: Option<String>,
90
91 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
94 storage_timely_config: TimelyConfig,
95 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
97 compute_timely_config: TimelyConfig,
98 #[clap(long, env = "PROCESS")]
100 process: usize,
101
102 #[clap(
105 long,
106 env = "PERSIST_PUBSUB_URL",
107 value_name = "http://HOST:PORT",
108 default_value = "http://localhost:6879"
109 )]
110 persist_pubsub_url: String,
111
112 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
117 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
118
119 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
122 aws_connection_role_arn: Option<String>,
123
124 #[clap(flatten)]
126 secrets: SecretsReaderCliArgs,
127
128 #[clap(flatten)]
130 tracing: TracingCliArgs,
131
132 #[clap(long, env = "ENVIRONMENT_ID")]
136 environment_id: String,
137
138 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
140 scratch_directory: Option<PathBuf>,
141
142 #[clap(long)]
147 announce_memory_limit: Option<usize>,
148
149 #[clap(long)]
156 heap_limit: Option<usize>,
157
158 #[clap(long)]
161 is_cc: bool,
162
163 #[clap(long)]
169 worker_core_affinity: bool,
170
171 #[clap(long)]
174 enable_storage_introspection_logs: bool,
175}
176
177pub fn main() {
178 mz_ore::panic::install_enhanced_handler();
179
180 let args = cli::parse_args(CliConfig {
181 env_prefix: Some("CLUSTERD_"),
182 enable_version_flag: true,
183 });
184
185 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
186 let runtime = tokio::runtime::Builder::new_multi_thread()
187 .worker_threads(ncpus_useful)
188 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
194 use std::sync::atomic::{AtomicUsize, Ordering};
195 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
196 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
197 format!("tokio:work-{}", id)
198 })
199 .enable_all()
200 .build()
201 .unwrap();
202 if let Err(err) = runtime.block_on(run(args)) {
203 panic!("clusterd: fatal: {}", err.display_with_causes());
204 }
205}
206
207async fn run(args: Args) -> Result<(), anyhow::Error> {
208 let metrics_registry = MetricsRegistry::new();
209 let tracing_handle = args
210 .tracing
211 .configure_tracing(
212 StaticTracingConfig {
213 service_name: "clusterd",
214 build_info: BUILD_INFO,
215 },
216 metrics_registry.clone(),
217 )
218 .await?;
219
220 let tracing_handle = Arc::new(tracing_handle);
221 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
222
223 mz_timely_util::panic::halt_on_timely_communication_panic();
226
227 let _failpoint_scenario = FailScenario::setup();
228
229 emit_boot_diagnostics!(&BUILD_INFO);
230
231 mz_alloc::register_metrics_into(&metrics_registry).await;
232 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
233
234 if let Some(heap_limit) = args.heap_limit {
235 mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
236 } else {
237 info!("no heap limit announced; disabling memory limiter");
238 }
239
240 let secrets_reader = args
241 .secrets
242 .load()
243 .await
244 .context("loading secrets reader")?;
245
246 let usage_collector = Arc::new(usage_metrics::Collector {
247 disk_root: args.scratch_directory.clone(),
248 });
249
250 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
251 let metrics_registry = metrics_registry.clone();
252 tracing::info!(
253 "serving internal HTTP server on {}",
254 args.internal_http_listen_addr
255 );
256 let listener = Listener::bind(args.internal_http_listen_addr).await?;
257 let mut make_service = mz_prof_http::router(&BUILD_INFO)
258 .route(
259 "/api/livez",
260 routing::get(mz_http_util::handle_liveness_check),
261 )
262 .route(
263 "/metrics",
264 routing::get(move || async move {
265 mz_http_util::handle_prometheus(&metrics_registry).await
266 }),
267 )
268 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
269 .route(
270 "/api/opentelemetry/config",
271 routing::put({
272 move |_: axum::Json<DynamicFilterTarget>| async {
273 (
274 StatusCode::BAD_REQUEST,
275 "This endpoint has been replaced. \
276 Use the `opentelemetry_filter` system variable."
277 .to_string(),
278 )
279 }
280 }),
281 )
282 .route(
283 "/api/stderr/config",
284 routing::put({
285 move |_: axum::Json<DynamicFilterTarget>| async {
286 (
287 StatusCode::BAD_REQUEST,
288 "This endpoint has been replaced. \
289 Use the `log_filter` system variable."
290 .to_string(),
291 )
292 }
293 }),
294 )
295 .route(
296 "/api/usage-metrics",
297 routing::get(async move || axum::Json(usage_collector.collect())),
298 )
299 .into_make_service();
300
301 async move {
304 loop {
305 let (conn, remote_addr) = match listener.accept().await {
306 Ok(peer) => peer,
307 Err(error) => {
308 if is_connection_error(&error) {
313 debug!("accepted connection already errored: {error:#}");
314 } else {
315 error!("internal_http accept error: {error:#}");
316 tokio::time::sleep(Duration::from_secs(1)).await;
317 }
318 continue;
319 }
320 };
321
322 let tower_service = make_service.call(&conn).await.expect("infallible");
323 let hyper_service =
324 hyper::service::service_fn(move |req| tower_service.clone().call(req));
325
326 mz_ore::task::spawn(
327 || format!("clusterd_internal_http_server:{remote_addr}"),
328 async move {
329 if let Err(error) = hyper::server::conn::http1::Builder::new()
330 .serve_connection(TokioIo::new(conn), hyper_service)
331 .await
332 {
333 info!("error serving internal_http connection: {error:#}");
337 }
338 },
339 );
340 }
341 }
342 });
343
344 let pubsub_caller_id = std::env::var("HOSTNAME")
345 .ok()
346 .or_else(|| args.tracing.log_prefix.clone())
347 .unwrap_or_default();
348 let mut persist_cfg =
349 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
350 persist_cfg.is_cc_active = args.is_cc;
351 persist_cfg.announce_memory_limit = args.announce_memory_limit;
352 persist_cfg.disable_compaction();
354
355 let persist_clients = Arc::new(PersistClientCache::new(
356 persist_cfg,
357 &metrics_registry,
358 |persist_cfg, metrics| {
359 let cfg = PersistPubSubClientConfig {
360 url: args.persist_pubsub_url,
361 caller_id: pubsub_caller_id,
362 persist_cfg: persist_cfg.clone(),
363 };
364 GrpcPubSubClient::connect(cfg, metrics)
365 },
366 ));
367 let txns_ctx = TxnsContext::default();
368
369 let connection_context = ConnectionContext::from_cli_args(
370 args.environment_id,
371 &args.tracing.startup_log_filter,
372 args.aws_external_id_prefix,
373 args.aws_connection_role_arn,
374 secrets_reader,
375 None,
376 );
377
378 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
379 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
380
381 let mut storage_timely_config = args.storage_timely_config;
382 storage_timely_config.process = args.process;
383 let mut compute_timely_config = args.compute_timely_config;
384 compute_timely_config.process = args.process;
385
386 assert_eq!(
388 storage_timely_config.workers, compute_timely_config.workers,
389 "storage and compute must have equal workers-per-process",
390 );
391
392 let (storage_log_writers, storage_log_readers) = if args.enable_storage_introspection_logs {
394 (0..storage_timely_config.workers)
395 .map(|_| arc_event_link())
396 .unzip()
397 } else {
398 (Vec::new(), Vec::new())
399 };
400
401 let storage_client_builder = mz_storage::serve(
403 storage_timely_config,
404 &metrics_registry,
405 Arc::clone(&persist_clients),
406 txns_ctx.clone(),
407 Arc::clone(&tracing_handle),
408 SYSTEM_TIME.clone(),
409 connection_context.clone(),
410 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
411 storage_log_writers,
412 )
413 .await?;
414 info!(
415 "listening for storage controller connections on {}",
416 args.storage_controller_listen_addr
417 );
418 mz_ore::task::spawn(
419 || "storage_server",
420 transport::serve(
421 args.storage_controller_listen_addr,
422 BUILD_INFO.semver_version(),
423 grpc_host.clone(),
424 Duration::MAX,
425 storage_client_builder,
426 grpc_server_metrics.for_server("storage"),
427 )
428 .instrument(info_span!("ctp", name = "storage")),
429 );
430
431 let compute_client_builder = mz_compute::server::serve(
433 compute_timely_config,
434 &metrics_registry,
435 persist_clients,
436 txns_ctx,
437 tracing_handle,
438 ComputeInstanceContext {
439 scratch_directory: args.scratch_directory,
440 worker_core_affinity: args.worker_core_affinity,
441 connection_context,
442 },
443 storage_log_readers,
444 )
445 .await?;
446 info!(
447 "listening for compute controller connections on {}",
448 args.compute_controller_listen_addr
449 );
450 mz_ore::task::spawn(
451 || "compute_server",
452 transport::serve(
453 args.compute_controller_listen_addr,
454 BUILD_INFO.semver_version(),
455 grpc_host.clone(),
456 Duration::MAX,
457 compute_client_builder,
458 grpc_server_metrics.for_server("compute"),
459 )
460 .instrument(info_span!("ctp", name = "compute")),
461 );
462
463 future::pending().await
467}
468
469fn is_connection_error(e: &std::io::Error) -> bool {
473 matches!(
474 e.kind(),
475 std::io::ErrorKind::ConnectionRefused
476 | std::io::ErrorKind::ConnectionAborted
477 | std::io::ErrorKind::ConnectionReset
478 )
479}