Skip to main content

mz_clusterd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_timely_util::capture::arc_event_link;
42use mz_txn_wal::operator::TxnsContext;
43use tokio::runtime::Handle;
44use tower::Service;
45use tracing::{Instrument, debug, error, info, info_span};
46
47mod usage_metrics;
48
49const BUILD_INFO: BuildInfo = build_info!();
50
51pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
52
53/// Independent cluster server for Materialize.
54#[derive(clap::Parser)]
55#[clap(name = "clusterd", version = VERSION.as_str())]
56struct Args {
57    // === Connection options. ===
58    /// The address on which to listen for a connection from the storage
59    /// controller.
60    #[clap(
61        long,
62        env = "STORAGE_CONTROLLER_LISTEN_ADDR",
63        value_name = "HOST:PORT",
64        default_value = "127.0.0.1:2100"
65    )]
66    storage_controller_listen_addr: SocketAddr,
67    /// The address on which to listen for a connection from the compute
68    /// controller.
69    #[clap(
70        long,
71        env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
72        value_name = "HOST:PORT",
73        default_value = "127.0.0.1:2101"
74    )]
75    compute_controller_listen_addr: SocketAddr,
76    /// The address of the internal HTTP server.
77    #[clap(
78        long,
79        env = "INTERNAL_HTTP_LISTEN_ADDR",
80        value_name = "HOST:PORT",
81        default_value = "127.0.0.1:6878"
82    )]
83    internal_http_listen_addr: SocketAddr,
84    /// The FQDN of this process, for GRPC request validation.
85    ///
86    /// Not providing this value or setting it to the empty string disables host validation for
87    /// GRPC requests.
88    #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
89    grpc_host: Option<String>,
90
91    // === Timely cluster options. ===
92    /// Configuration for the storage Timely cluster.
93    #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
94    storage_timely_config: TimelyConfig,
95    /// Configuration for the compute Timely cluster.
96    #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
97    compute_timely_config: TimelyConfig,
98    /// The index of the process in both Timely clusters.
99    #[clap(long, env = "PROCESS")]
100    process: usize,
101
102    // === Storage options. ===
103    /// The URL for the Persist PubSub service.
104    #[clap(
105        long,
106        env = "PERSIST_PUBSUB_URL",
107        value_name = "http://HOST:PORT",
108        default_value = "http://localhost:6879"
109    )]
110    persist_pubsub_url: String,
111
112    // === Cloud options. ===
113    /// An external ID to be supplied to all AWS AssumeRole operations.
114    ///
115    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
116    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
117    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
118
119    /// The ARN for a Materialize-controlled role to assume before assuming
120    /// a customer's requested role for an AWS connection.
121    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
122    aws_connection_role_arn: Option<String>,
123
124    // === Secrets reader options. ===
125    #[clap(flatten)]
126    secrets: SecretsReaderCliArgs,
127
128    // === Tracing options. ===
129    #[clap(flatten)]
130    tracing: TracingCliArgs,
131
132    // === Other options. ===
133    /// An opaque identifier for the environment in which this process is
134    /// running.
135    #[clap(long, env = "ENVIRONMENT_ID")]
136    environment_id: String,
137
138    /// A scratch directory that can be used for ephemeral storage.
139    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
140    scratch_directory: Option<PathBuf>,
141
142    /// Memory limit (bytes) of the cluster replica, if known.
143    ///
144    /// The limit is expected to be enforced by the orchestrator. The clusterd process only uses it
145    /// to inform configuration of backpressure mechanism.
146    #[clap(long)]
147    announce_memory_limit: Option<usize>,
148
149    /// Heap limit (bytes) of the cluster replica.
150    ///
151    /// A process heap usage is calculated as the sum of its memory and swap usage.
152    ///
153    /// In contrast to `announce_memory_limit`, this limit is enforced by the clusterd process. If
154    /// the limit is exceeded, the process terminates itself with a 167 exit code.
155    #[clap(long)]
156    heap_limit: Option<usize>,
157
158    /// Whether this size represents a modern "cc" size rather than a legacy
159    /// T-shirt size.
160    #[clap(long)]
161    is_cc: bool,
162
163    /// Set core affinity for Timely workers.
164    ///
165    /// This flag should only be set if the process is provided with exclusive access to its
166    /// supplied CPU cores. If other processes are competing over the same cores, setting core
167    /// affinity might degrade dataflow performance rather than improving it.
168    #[clap(long)]
169    worker_core_affinity: bool,
170
171    /// Forward storage's timely logging events to compute so storage operators appear in
172    /// `mz_introspection.mz_dataflow_*` tables.
173    #[clap(long)]
174    enable_storage_introspection_logs: bool,
175}
176
177pub fn main() {
178    mz_ore::panic::install_enhanced_handler();
179
180    let args = cli::parse_args(CliConfig {
181        env_prefix: Some("CLUSTERD_"),
182        enable_version_flag: true,
183    });
184
185    let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
186    let runtime = tokio::runtime::Builder::new_multi_thread()
187        .worker_threads(ncpus_useful)
188        .thread_stack_size(3 * 1024 * 1024) // 3 MiB
189        // The default thread name exceeds the Linux limit on thread name
190        // length, so pick something shorter. The maximum length is 16 including
191        // a \0 terminator. This gives us four decimals, which should be enough
192        // for most existing computers.
193        .thread_name_fn(|| {
194            use std::sync::atomic::{AtomicUsize, Ordering};
195            static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
196            let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
197            format!("tokio:work-{}", id)
198        })
199        .enable_all()
200        .build()
201        .unwrap();
202    if let Err(err) = runtime.block_on(run(args)) {
203        panic!("clusterd: fatal: {}", err.display_with_causes());
204    }
205}
206
207async fn run(args: Args) -> Result<(), anyhow::Error> {
208    let metrics_registry = MetricsRegistry::new();
209    let tracing_handle = args
210        .tracing
211        .configure_tracing(
212            StaticTracingConfig {
213                service_name: "clusterd",
214                build_info: BUILD_INFO,
215            },
216            metrics_registry.clone(),
217        )
218        .await?;
219
220    let tracing_handle = Arc::new(tracing_handle);
221    register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
222
223    // Keep this _after_ the mz_ore::tracing::configure call so that its panic
224    // hook runs _before_ the one that sends things to sentry.
225    mz_timely_util::panic::halt_on_timely_communication_panic();
226
227    let _failpoint_scenario = FailScenario::setup();
228
229    emit_boot_diagnostics!(&BUILD_INFO);
230
231    mz_alloc::register_metrics_into(&metrics_registry).await;
232    mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
233
234    if let Some(heap_limit) = args.heap_limit {
235        mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
236    } else {
237        info!("no heap limit announced; disabling memory limiter");
238    }
239
240    let secrets_reader = args
241        .secrets
242        .load()
243        .await
244        .context("loading secrets reader")?;
245
246    let usage_collector = Arc::new(usage_metrics::Collector {
247        disk_root: args.scratch_directory.clone(),
248    });
249
250    mz_ore::task::spawn(|| "clusterd_internal_http_server", {
251        let metrics_registry = metrics_registry.clone();
252        tracing::info!(
253            "serving internal HTTP server on {}",
254            args.internal_http_listen_addr
255        );
256        let listener = Listener::bind(args.internal_http_listen_addr).await?;
257        let mut make_service = mz_prof_http::router(&BUILD_INFO)
258            .route(
259                "/api/livez",
260                routing::get(mz_http_util::handle_liveness_check),
261            )
262            .route(
263                "/metrics",
264                routing::get(move || async move {
265                    mz_http_util::handle_prometheus(&metrics_registry).await
266                }),
267            )
268            .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
269            .route(
270                "/api/opentelemetry/config",
271                routing::put({
272                    move |_: axum::Json<DynamicFilterTarget>| async {
273                        (
274                            StatusCode::BAD_REQUEST,
275                            "This endpoint has been replaced. \
276                                Use the `opentelemetry_filter` system variable."
277                                .to_string(),
278                        )
279                    }
280                }),
281            )
282            .route(
283                "/api/stderr/config",
284                routing::put({
285                    move |_: axum::Json<DynamicFilterTarget>| async {
286                        (
287                            StatusCode::BAD_REQUEST,
288                            "This endpoint has been replaced. \
289                                Use the `log_filter` system variable."
290                                .to_string(),
291                        )
292                    }
293                }),
294            )
295            .route(
296                "/api/usage-metrics",
297                routing::get(async move || axum::Json(usage_collector.collect())),
298            )
299            .into_make_service();
300
301        // Once https://github.com/tokio-rs/axum/pull/2479 lands, this can become just a call to
302        // `axum::serve`.
303        async move {
304            loop {
305                let (conn, remote_addr) = match listener.accept().await {
306                    Ok(peer) => peer,
307                    Err(error) => {
308                        // Match hyper's AddrIncoming error handling:
309                        // connection errors are per-connection and can be
310                        // skipped immediately; all other errors (e.g., EMFILE)
311                        // sleep to avoid a tight loop on resource exhaustion.
312                        if is_connection_error(&error) {
313                            debug!("accepted connection already errored: {error:#}");
314                        } else {
315                            error!("internal_http accept error: {error:#}");
316                            tokio::time::sleep(Duration::from_secs(1)).await;
317                        }
318                        continue;
319                    }
320                };
321
322                let tower_service = make_service.call(&conn).await.expect("infallible");
323                let hyper_service =
324                    hyper::service::service_fn(move |req| tower_service.clone().call(req));
325
326                mz_ore::task::spawn(
327                    || format!("clusterd_internal_http_server:{remote_addr}"),
328                    async move {
329                        if let Err(error) = hyper::server::conn::http1::Builder::new()
330                            .serve_connection(TokioIo::new(conn), hyper_service)
331                            .await
332                        {
333                            // This can happen when the client performs an unclean shutdown, so a
334                            // high severity isn't warranted. Might even downgrade this to DEBUG if
335                            // it turns out too noisy.
336                            info!("error serving internal_http connection: {error:#}");
337                        }
338                    },
339                );
340            }
341        }
342    });
343
344    let pubsub_caller_id = std::env::var("HOSTNAME")
345        .ok()
346        .or_else(|| args.tracing.log_prefix.clone())
347        .unwrap_or_default();
348    let mut persist_cfg =
349        PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
350    persist_cfg.is_cc_active = args.is_cc;
351    persist_cfg.announce_memory_limit = args.announce_memory_limit;
352    // Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
353    persist_cfg.disable_compaction();
354
355    let persist_clients = Arc::new(PersistClientCache::new(
356        persist_cfg,
357        &metrics_registry,
358        |persist_cfg, metrics| {
359            let cfg = PersistPubSubClientConfig {
360                url: args.persist_pubsub_url,
361                caller_id: pubsub_caller_id,
362                persist_cfg: persist_cfg.clone(),
363            };
364            GrpcPubSubClient::connect(cfg, metrics)
365        },
366    ));
367    let txns_ctx = TxnsContext::default();
368
369    let connection_context = ConnectionContext::from_cli_args(
370        args.environment_id,
371        &args.tracing.startup_log_filter,
372        args.aws_external_id_prefix,
373        args.aws_connection_role_arn,
374        secrets_reader,
375        None,
376    );
377
378    let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
379    let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
380
381    let mut storage_timely_config = args.storage_timely_config;
382    storage_timely_config.process = args.process;
383    let mut compute_timely_config = args.compute_timely_config;
384    compute_timely_config.process = args.process;
385
386    // We assume each storage worker has a corresponding compute worker that can process its logs.
387    assert_eq!(
388        storage_timely_config.workers, compute_timely_config.workers,
389        "storage and compute must have equal workers-per-process",
390    );
391
392    // Create per-worker bridges for forwarding storage timely logging events to compute.
393    let (storage_log_writers, storage_log_readers) = if args.enable_storage_introspection_logs {
394        (0..storage_timely_config.workers)
395            .map(|_| arc_event_link())
396            .unzip()
397    } else {
398        (Vec::new(), Vec::new())
399    };
400
401    // Start storage server.
402    let storage_client_builder = mz_storage::serve(
403        storage_timely_config,
404        &metrics_registry,
405        Arc::clone(&persist_clients),
406        txns_ctx.clone(),
407        Arc::clone(&tracing_handle),
408        SYSTEM_TIME.clone(),
409        connection_context.clone(),
410        StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
411        storage_log_writers,
412    )
413    .await?;
414    info!(
415        "listening for storage controller connections on {}",
416        args.storage_controller_listen_addr
417    );
418    mz_ore::task::spawn(
419        || "storage_server",
420        transport::serve(
421            args.storage_controller_listen_addr,
422            BUILD_INFO.semver_version(),
423            grpc_host.clone(),
424            Duration::MAX,
425            storage_client_builder,
426            grpc_server_metrics.for_server("storage"),
427        )
428        .instrument(info_span!("ctp", name = "storage")),
429    );
430
431    // Start compute server.
432    let compute_client_builder = mz_compute::server::serve(
433        compute_timely_config,
434        &metrics_registry,
435        persist_clients,
436        txns_ctx,
437        tracing_handle,
438        ComputeInstanceContext {
439            scratch_directory: args.scratch_directory,
440            worker_core_affinity: args.worker_core_affinity,
441            connection_context,
442        },
443        storage_log_readers,
444    )
445    .await?;
446    info!(
447        "listening for compute controller connections on {}",
448        args.compute_controller_listen_addr
449    );
450    mz_ore::task::spawn(
451        || "compute_server",
452        transport::serve(
453            args.compute_controller_listen_addr,
454            BUILD_INFO.semver_version(),
455            grpc_host.clone(),
456            Duration::MAX,
457            compute_client_builder,
458            grpc_server_metrics.for_server("compute"),
459        )
460        .instrument(info_span!("ctp", name = "compute")),
461    );
462
463    // TODO: unify storage and compute servers to use one timely cluster.
464
465    // Block forever.
466    future::pending().await
467}
468
469/// Per-connection errors from `accept()` that can be skipped immediately.
470/// All other errors (e.g., EMFILE/ENFILE resource exhaustion) warrant a sleep
471/// before retrying. Mirrors hyper's `AddrIncoming` classification.
472fn is_connection_error(e: &std::io::Error) -> bool {
473    matches!(
474        e.kind(),
475        std::io::ErrorKind::ConnectionRefused
476            | std::io::ErrorKind::ConnectionAborted
477            | std::io::ErrorKind::ConnectionReset
478    )
479}