1use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_compute_client::service::proto_compute_server::ProtoComputeServer;
26use mz_http_util::DynamicFilterTarget;
27use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
28use mz_ore::cli::{self, CliConfig};
29use mz_ore::error::ErrorExt;
30use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
31use mz_ore::netio::{Listener, SocketAddr};
32use mz_ore::now::SYSTEM_TIME;
33use mz_persist_client::cache::PersistClientCache;
34use mz_persist_client::cfg::PersistConfig;
35use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
36use mz_service::emit_boot_diagnostics;
37use mz_service::grpc::{GrpcServer, GrpcServerMetrics, MAX_GRPC_MESSAGE_SIZE};
38use mz_service::secrets::SecretsReaderCliArgs;
39use mz_service::transport;
40use mz_storage::storage_state::StorageInstanceContext;
41use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
42use mz_storage_types::connections::ConnectionContext;
43use mz_txn_wal::operator::TxnsContext;
44use tokio::runtime::Handle;
45use tower::Service;
46use tracing::{error, info, warn};
47
48mod usage_metrics;
49
50const BUILD_INFO: BuildInfo = build_info!();
51
52pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
53
54#[derive(clap::Parser)]
56#[clap(name = "clusterd", version = VERSION.as_str())]
57struct Args {
58 #[clap(
62 long,
63 env = "STORAGE_CONTROLLER_LISTEN_ADDR",
64 value_name = "HOST:PORT",
65 default_value = "127.0.0.1:2100"
66 )]
67 storage_controller_listen_addr: SocketAddr,
68 #[clap(
71 long,
72 env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
73 value_name = "HOST:PORT",
74 default_value = "127.0.0.1:2101"
75 )]
76 compute_controller_listen_addr: SocketAddr,
77 #[clap(
79 long,
80 env = "INTERNAL_HTTP_LISTEN_ADDR",
81 value_name = "HOST:PORT",
82 default_value = "127.0.0.1:6878"
83 )]
84 internal_http_listen_addr: SocketAddr,
85 #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
90 grpc_host: Option<String>,
91 #[clap(long, env = "USE_CTP")]
93 use_ctp: bool,
94
95 #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
98 storage_timely_config: TimelyConfig,
99 #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
101 compute_timely_config: TimelyConfig,
102 #[clap(long, env = "PROCESS")]
104 process: usize,
105
106 #[clap(
109 long,
110 env = "PERSIST_PUBSUB_URL",
111 value_name = "http://HOST:PORT",
112 default_value = "http://localhost:6879"
113 )]
114 persist_pubsub_url: String,
115
116 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
121 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
122
123 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
126 aws_connection_role_arn: Option<String>,
127
128 #[clap(flatten)]
130 secrets: SecretsReaderCliArgs,
131
132 #[clap(flatten)]
134 tracing: TracingCliArgs,
135
136 #[clap(long, env = "ENVIRONMENT_ID")]
140 environment_id: String,
141
142 #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
144 scratch_directory: Option<PathBuf>,
145
146 #[clap(long)]
148 announce_memory_limit: Option<usize>,
149
150 #[clap(long)]
153 is_cc: bool,
154
155 #[clap(long)]
161 worker_core_affinity: bool,
162}
163
164pub fn main() {
165 mz_ore::panic::install_enhanced_handler();
166
167 let args = cli::parse_args(CliConfig {
168 env_prefix: Some("CLUSTERD_"),
169 enable_version_flag: true,
170 });
171
172 let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
173 let runtime = tokio::runtime::Builder::new_multi_thread()
174 .worker_threads(ncpus_useful)
175 .thread_name_fn(|| {
180 use std::sync::atomic::{AtomicUsize, Ordering};
181 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
182 let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
183 format!("tokio:work-{}", id)
184 })
185 .enable_all()
186 .build()
187 .unwrap();
188 if let Err(err) = runtime.block_on(run(args)) {
189 panic!("clusterd: fatal: {}", err.display_with_causes());
190 }
191}
192
193async fn run(args: Args) -> Result<(), anyhow::Error> {
194 let metrics_registry = MetricsRegistry::new();
195 let (tracing_handle, _tracing_guard) = args
196 .tracing
197 .configure_tracing(
198 StaticTracingConfig {
199 service_name: "clusterd",
200 build_info: BUILD_INFO,
201 },
202 metrics_registry.clone(),
203 )
204 .await?;
205
206 let tracing_handle = Arc::new(tracing_handle);
207 register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
208
209 mz_timely_util::panic::halt_on_timely_communication_panic();
212
213 let _failpoint_scenario = FailScenario::setup();
214
215 emit_boot_diagnostics!(&BUILD_INFO);
216
217 mz_alloc::register_metrics_into(&metrics_registry).await;
218 mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
219
220 if let Some(memory_limit) = args.announce_memory_limit {
221 mz_compute::lgalloc::start_limiter(memory_limit, &metrics_registry);
222 mz_compute::memory_limiter::start_limiter(memory_limit, &metrics_registry);
223 } else {
224 warn!("no memory limit announced; disabling lgalloc limiter");
225 }
226
227 let secrets_reader = args
228 .secrets
229 .load()
230 .await
231 .context("loading secrets reader")?;
232
233 let usage_collector = Arc::new(usage_metrics::Collector {
234 disk_root: args.scratch_directory.clone(),
235 });
236
237 mz_ore::task::spawn(|| "clusterd_internal_http_server", {
238 let metrics_registry = metrics_registry.clone();
239 tracing::info!(
240 "serving internal HTTP server on {}",
241 args.internal_http_listen_addr
242 );
243 let listener = Listener::bind(args.internal_http_listen_addr).await?;
244 let mut make_service = mz_prof_http::router(&BUILD_INFO)
245 .route(
246 "/api/livez",
247 routing::get(mz_http_util::handle_liveness_check),
248 )
249 .route(
250 "/metrics",
251 routing::get(move || async move {
252 mz_http_util::handle_prometheus(&metrics_registry).await
253 }),
254 )
255 .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
256 .route(
257 "/api/opentelemetry/config",
258 routing::put({
259 move |_: axum::Json<DynamicFilterTarget>| async {
260 (
261 StatusCode::BAD_REQUEST,
262 "This endpoint has been replaced. \
263 Use the `opentelemetry_filter` system variable."
264 .to_string(),
265 )
266 }
267 }),
268 )
269 .route(
270 "/api/stderr/config",
271 routing::put({
272 move |_: axum::Json<DynamicFilterTarget>| async {
273 (
274 StatusCode::BAD_REQUEST,
275 "This endpoint has been replaced. \
276 Use the `log_filter` system variable."
277 .to_string(),
278 )
279 }
280 }),
281 )
282 .route(
283 "/api/usage-metrics",
284 routing::get(async move || axum::Json(usage_collector.collect())),
285 )
286 .into_make_service();
287
288 async move {
291 loop {
292 let (conn, remote_addr) = match listener.accept().await {
293 Ok(peer) => peer,
294 Err(error) => {
295 error!("internal_http connection failed: {error:#}");
296 break;
297 }
298 };
299
300 let tower_service = make_service.call(&conn).await.expect("infallible");
301 let hyper_service =
302 hyper::service::service_fn(move |req| tower_service.clone().call(req));
303
304 mz_ore::task::spawn(
305 || format!("clusterd_internal_http_server:{remote_addr}"),
306 async move {
307 if let Err(error) = hyper::server::conn::http1::Builder::new()
308 .serve_connection(TokioIo::new(conn), hyper_service)
309 .await
310 {
311 error!("failed to serve internal_http connection: {error:#}");
312 }
313 },
314 );
315 }
316 }
317 });
318
319 let pubsub_caller_id = std::env::var("HOSTNAME")
320 .ok()
321 .or_else(|| args.tracing.log_prefix.clone())
322 .unwrap_or_default();
323 let mut persist_cfg =
324 PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
325 persist_cfg.is_cc_active = args.is_cc;
326 persist_cfg.announce_memory_limit = args.announce_memory_limit;
327 persist_cfg.disable_compaction();
329
330 let persist_clients = Arc::new(PersistClientCache::new(
331 persist_cfg,
332 &metrics_registry,
333 |persist_cfg, metrics| {
334 let cfg = PersistPubSubClientConfig {
335 url: args.persist_pubsub_url,
336 caller_id: pubsub_caller_id,
337 persist_cfg: persist_cfg.clone(),
338 };
339 GrpcPubSubClient::connect(cfg, metrics)
340 },
341 ));
342 let txns_ctx = TxnsContext::default();
343
344 let connection_context = ConnectionContext::from_cli_args(
345 args.environment_id,
346 &args.tracing.startup_log_filter,
347 args.aws_external_id_prefix,
348 args.aws_connection_role_arn,
349 secrets_reader,
350 None,
351 );
352
353 let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
354 let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
355
356 let mut storage_timely_config = args.storage_timely_config;
357 storage_timely_config.process = args.process;
358 let mut compute_timely_config = args.compute_timely_config;
359 compute_timely_config.process = args.process;
360
361 let storage_client_builder = mz_storage::serve(
363 storage_timely_config,
364 &metrics_registry,
365 Arc::clone(&persist_clients),
366 txns_ctx.clone(),
367 Arc::clone(&tracing_handle),
368 SYSTEM_TIME.clone(),
369 connection_context.clone(),
370 StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
371 )
372 .await?;
373 info!(
374 "listening for storage controller connections on {}",
375 args.storage_controller_listen_addr
376 );
377 let storage_serve = if args.use_ctp {
378 future::Either::Left(transport::serve(
379 args.storage_controller_listen_addr,
380 BUILD_INFO.semver_version(),
381 grpc_host.clone(),
382 Duration::MAX,
383 storage_client_builder,
384 grpc_server_metrics.for_server("storage"),
385 ))
386 } else {
387 future::Either::Right(GrpcServer::serve(
388 &grpc_server_metrics,
389 args.storage_controller_listen_addr,
390 BUILD_INFO.semver_version(),
391 grpc_host.clone(),
392 storage_client_builder,
393 |svc| ProtoStorageServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
394 ))
395 };
396 mz_ore::task::spawn(|| "storage_server", storage_serve);
397
398 let compute_client_builder = mz_compute::server::serve(
400 compute_timely_config,
401 &metrics_registry,
402 persist_clients,
403 txns_ctx,
404 tracing_handle,
405 ComputeInstanceContext {
406 scratch_directory: args.scratch_directory,
407 worker_core_affinity: args.worker_core_affinity,
408 connection_context,
409 },
410 )
411 .await?;
412 info!(
413 "listening for compute controller connections on {}",
414 args.compute_controller_listen_addr
415 );
416 let compute_serve = if args.use_ctp {
417 future::Either::Left(transport::serve(
418 args.compute_controller_listen_addr,
419 BUILD_INFO.semver_version(),
420 grpc_host.clone(),
421 Duration::MAX,
422 compute_client_builder,
423 grpc_server_metrics.for_server("compute"),
424 ))
425 } else {
426 future::Either::Right(GrpcServer::serve(
427 &grpc_server_metrics,
428 args.compute_controller_listen_addr,
429 BUILD_INFO.semver_version(),
430 grpc_host,
431 compute_client_builder,
432 |svc| ProtoComputeServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
433 ))
434 };
435 mz_ore::task::spawn(|| "compute_server", compute_serve);
436
437 future::pending().await
441}