1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{Instrument, error, info, info_span};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56 #[clap(
60 long,
61 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62 value_name = "HOST:PORT",
63 default_value = "127.0.0.1:2100"
64 )]
65 storage_controller_listen_addr: SocketAddr,
66 #[clap(
69 long,
70 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71 value_name = "HOST:PORT",
72 default_value = "127.0.0.1:2101"
73 )]
74 compute_controller_listen_addr: SocketAddr,
75 #[clap(
77 long,
78 env = "INTERNAL_HTTP_LISTEN_ADDR",
79 value_name = "HOST:PORT",
80 default_value = "127.0.0.1:6878"
81 )]
82 internal_http_listen_addr: SocketAddr,
83 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88 grpc_host: Option<String>,
89
90 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93 storage_timely_config: TimelyConfig,
94 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96 compute_timely_config: TimelyConfig,
97 #[clap(long, env = "PROCESS")]
99 process: usize,
100
101 #[clap(
104 long,
105 env = "PERSIST_PUBSUB_URL",
106 value_name = "http://HOST:PORT",
107 default_value = "http://localhost:6879"
108 )]
109 persist_pubsub_url: String,
110
111 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121 aws_connection_role_arn: Option<String>,
122
123 #[clap(flatten)]
125 secrets: SecretsReaderCliArgs,
126
127 #[clap(flatten)]
129 tracing: TracingCliArgs,
130
131 #[clap(long, env = "ENVIRONMENT_ID")]
135 environment_id: String,
136
137 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139 scratch_directory: Option<PathBuf>,
140
141 #[clap(long)]
146 announce_memory_limit: Option<usize>,
147
148 #[clap(long)]
155 heap_limit: Option<usize>,
156
157 #[clap(long)]
160 is_cc: bool,
161
162 #[clap(long)]
168 worker_core_affinity: bool,
169}
170
171pub fn main() {
172 mz_ore::panic::install_enhanced_handler();
173
174 let args = cli::parse_args(CliConfig {
175 env_prefix: Some("CLUSTERD_"),
176 enable_version_flag: true,
177 });
178
179 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180 let runtime = tokio::runtime::Builder::new_multi_thread()
181 .worker_threads(ncpus_useful)
182 .thread_stack_size(3 * 1024 * 1024) .thread_name_fn(|| {
188 use std::sync::atomic::{AtomicUsize, Ordering};
189 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
190 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
191 format!("tokio:work-{}", id)
192 })
193 .enable_all()
194 .build()
195 .unwrap();
196 if let Err(err) = runtime.block_on(run(args)) {
197 panic!("clusterd: fatal: {}", err.display_with_causes());
198 }
199}
200
201async fn run(args: Args) -> Result<(), anyhow::Error> {
202 let metrics_registry = MetricsRegistry::new();
203 let tracing_handle = args
204 .tracing
205 .configure_tracing(
206 StaticTracingConfig {
207 service_name: "clusterd",
208 build_info: BUILD_INFO,
209 },
210 metrics_registry.clone(),
211 )
212 .await?;
213
214 let tracing_handle = Arc::new(tracing_handle);
215 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
216
217 mz_timely_util::panic::halt_on_timely_communication_panic();
220
221 let _failpoint_scenario = FailScenario::setup();
222
223 emit_boot_diagnostics!(&BUILD_INFO);
224
225 mz_alloc::register_metrics_into(&metrics_registry).await;
226 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
227
228 if let Some(heap_limit) = args.heap_limit {
229 mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
230 } else {
231 info!("no heap limit announced; disabling memory limiter");
232 }
233
234 let secrets_reader = args
235 .secrets
236 .load()
237 .await
238 .context("loading secrets reader")?;
239
240 let usage_collector = Arc::new(usage_metrics::Collector {
241 disk_root: args.scratch_directory.clone(),
242 });
243
244 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
245 let metrics_registry = metrics_registry.clone();
246 tracing::info!(
247 "serving internal HTTP server on {}",
248 args.internal_http_listen_addr
249 );
250 let listener = Listener::bind(args.internal_http_listen_addr).await?;
251 let mut make_service = mz_prof_http::router(&BUILD_INFO)
252 .route(
253 "/api/livez",
254 routing::get(mz_http_util::handle_liveness_check),
255 )
256 .route(
257 "/metrics",
258 routing::get(move || async move {
259 mz_http_util::handle_prometheus(&metrics_registry).await
260 }),
261 )
262 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
263 .route(
264 "/api/opentelemetry/config",
265 routing::put({
266 move |_: axum::Json<DynamicFilterTarget>| async {
267 (
268 StatusCode::BAD_REQUEST,
269 "This endpoint has been replaced. \
270 Use the `opentelemetry_filter` system variable."
271 .to_string(),
272 )
273 }
274 }),
275 )
276 .route(
277 "/api/stderr/config",
278 routing::put({
279 move |_: axum::Json<DynamicFilterTarget>| async {
280 (
281 StatusCode::BAD_REQUEST,
282 "This endpoint has been replaced. \
283 Use the `log_filter` system variable."
284 .to_string(),
285 )
286 }
287 }),
288 )
289 .route(
290 "/api/usage-metrics",
291 routing::get(async move || axum::Json(usage_collector.collect())),
292 )
293 .into_make_service();
294
295 async move {
298 loop {
299 let (conn, remote_addr) = match listener.accept().await {
300 Ok(peer) => peer,
301 Err(error) => {
302 error!("internal_http connection failed: {error:#}");
303 break;
304 }
305 };
306
307 let tower_service = make_service.call(&conn).await.expect("infallible");
308 let hyper_service =
309 hyper::service::service_fn(move |req| tower_service.clone().call(req));
310
311 mz_ore::task::spawn(
312 || format!("clusterd_internal_http_server:{remote_addr}"),
313 async move {
314 if let Err(error) = hyper::server::conn::http1::Builder::new()
315 .serve_connection(TokioIo::new(conn), hyper_service)
316 .await
317 {
318 info!("error serving internal_http connection: {error:#}");
322 }
323 },
324 );
325 }
326 }
327 });
328
329 let pubsub_caller_id = std::env::var("HOSTNAME")
330 .ok()
331 .or_else(|| args.tracing.log_prefix.clone())
332 .unwrap_or_default();
333 let mut persist_cfg =
334 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
335 persist_cfg.is_cc_active = args.is_cc;
336 persist_cfg.announce_memory_limit = args.announce_memory_limit;
337 persist_cfg.disable_compaction();
339
340 let persist_clients = Arc::new(PersistClientCache::new(
341 persist_cfg,
342 &metrics_registry,
343 |persist_cfg, metrics| {
344 let cfg = PersistPubSubClientConfig {
345 url: args.persist_pubsub_url,
346 caller_id: pubsub_caller_id,
347 persist_cfg: persist_cfg.clone(),
348 };
349 GrpcPubSubClient::connect(cfg, metrics)
350 },
351 ));
352 let txns_ctx = TxnsContext::default();
353
354 let connection_context = ConnectionContext::from_cli_args(
355 args.environment_id,
356 &args.tracing.startup_log_filter,
357 args.aws_external_id_prefix,
358 args.aws_connection_role_arn,
359 secrets_reader,
360 None,
361 );
362
363 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
364 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
365
366 let mut storage_timely_config = args.storage_timely_config;
367 storage_timely_config.process = args.process;
368 let mut compute_timely_config = args.compute_timely_config;
369 compute_timely_config.process = args.process;
370
371 let storage_client_builder = mz_storage::serve(
373 storage_timely_config,
374 &metrics_registry,
375 Arc::clone(&persist_clients),
376 txns_ctx.clone(),
377 Arc::clone(&tracing_handle),
378 SYSTEM_TIME.clone(),
379 connection_context.clone(),
380 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
381 )
382 .await?;
383 info!(
384 "listening for storage controller connections on {}",
385 args.storage_controller_listen_addr
386 );
387 mz_ore::task::spawn(
388 || "storage_server",
389 transport::serve(
390 args.storage_controller_listen_addr,
391 BUILD_INFO.semver_version(),
392 grpc_host.clone(),
393 Duration::MAX,
394 storage_client_builder,
395 grpc_server_metrics.for_server("storage"),
396 )
397 .instrument(info_span!("ctp", name = "storage")),
398 );
399
400 let compute_client_builder = mz_compute::server::serve(
402 compute_timely_config,
403 &metrics_registry,
404 persist_clients,
405 txns_ctx,
406 tracing_handle,
407 ComputeInstanceContext {
408 scratch_directory: args.scratch_directory,
409 worker_core_affinity: args.worker_core_affinity,
410 connection_context,
411 },
412 )
413 .await?;
414 info!(
415 "listening for compute controller connections on {}",
416 args.compute_controller_listen_addr
417 );
418 mz_ore::task::spawn(
419 || "compute_server",
420 transport::serve(
421 args.compute_controller_listen_addr,
422 BUILD_INFO.semver_version(),
423 grpc_host.clone(),
424 Duration::MAX,
425 compute_client_builder,
426 grpc_server_metrics.for_server("compute"),
427 )
428 .instrument(info_span!("ctp", name = "compute")),
429 );
430
431 future::pending().await
435}