mz_clusterd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{error, info};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52/// Independent cluster server for Materialize.
53#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56    // === Connection options. ===
57    /// The address on which to listen for a connection from the storage
58    /// controller.
59    #[clap(
60        long,
61        env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62        value_name = "HOST:PORT",
63        default_value = "127.0.0.1:2100"
64    )]
65    storage_controller_listen_addr: SocketAddr,
66    /// The address on which to listen for a connection from the compute
67    /// controller.
68    #[clap(
69        long,
70        env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71        value_name = "HOST:PORT",
72        default_value = "127.0.0.1:2101"
73    )]
74    compute_controller_listen_addr: SocketAddr,
75    /// The address of the internal HTTP server.
76    #[clap(
77        long,
78        env = "INTERNAL_HTTP_LISTEN_ADDR",
79        value_name = "HOST:PORT",
80        default_value = "127.0.0.1:6878"
81    )]
82    internal_http_listen_addr: SocketAddr,
83    /// The FQDN of this process, for GRPC request validation.
84    ///
85    /// Not providing this value or setting it to the empty string disables host validation for
86    /// GRPC requests.
87    #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88    grpc_host: Option<String>,
89
90    // === Timely cluster options. ===
91    /// Configuration for the storage Timely cluster.
92    #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93    storage_timely_config: TimelyConfig,
94    /// Configuration for the compute Timely cluster.
95    #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96    compute_timely_config: TimelyConfig,
97    /// The index of the process in both Timely clusters.
98    #[clap(long, env = "PROCESS")]
99    process: usize,
100
101    // === Storage options. ===
102    /// The URL for the Persist PubSub service.
103    #[clap(
104        long,
105        env = "PERSIST_PUBSUB_URL",
106        value_name = "http://HOST:PORT",
107        default_value = "http://localhost:6879"
108    )]
109    persist_pubsub_url: String,
110
111    // === Cloud options. ===
112    /// An external ID to be supplied to all AWS AssumeRole operations.
113    ///
114    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
115    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118    /// The ARN for a Materialize-controlled role to assume before assuming
119    /// a customer's requested role for an AWS connection.
120    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121    aws_connection_role_arn: Option<String>,
122
123    // === Secrets reader options. ===
124    #[clap(flatten)]
125    secrets: SecretsReaderCliArgs,
126
127    // === Tracing options. ===
128    #[clap(flatten)]
129    tracing: TracingCliArgs,
130
131    // === Other options. ===
132    /// An opaque identifier for the environment in which this process is
133    /// running.
134    #[clap(long, env = "ENVIRONMENT_ID")]
135    environment_id: String,
136
137    /// A scratch directory that can be used for ephemeral storage.
138    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139    scratch_directory: Option<PathBuf>,
140
141    /// Memory limit (bytes) of the cluster replica, if known.
142    ///
143    /// The limit is expected to be enforced by the orchestrator. The clusterd process only uses it
144    /// to inform configuration of backpressure mechanism.
145    #[clap(long)]
146    announce_memory_limit: Option<usize>,
147
148    /// Heap limit (bytes) of the cluster replica.
149    ///
150    /// A process heap usage is calculated as the sum of its memory and swap usage.
151    ///
152    /// In contrast to `announce_memory_limit`, this limit is enforced by the clusterd process. If
153    /// the limit is exceeded, the process terminates itself with a 167 exit code.
154    #[clap(long)]
155    heap_limit: Option<usize>,
156
157    /// Whether this size represents a modern "cc" size rather than a legacy
158    /// T-shirt size.
159    #[clap(long)]
160    is_cc: bool,
161
162    /// Set core affinity for Timely workers.
163    ///
164    /// This flag should only be set if the process is provided with exclusive access to its
165    /// supplied CPU cores. If other processes are competing over the same cores, setting core
166    /// affinity might degrade dataflow performance rather than improving it.
167    #[clap(long)]
168    worker_core_affinity: bool,
169}
170
171pub fn main() {
172    mz_ore::panic::install_enhanced_handler();
173
174    let args = cli::parse_args(CliConfig {
175        env_prefix: Some("CLUSTERD_"),
176        enable_version_flag: true,
177    });
178
179    let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180    let runtime = tokio::runtime::Builder::new_multi_thread()
181        .worker_threads(ncpus_useful)
182        // The default thread name exceeds the Linux limit on thread name
183        // length, so pick something shorter. The maximum length is 16 including
184        // a \0 terminator. This gives us four decimals, which should be enough
185        // for most existing computers.
186        .thread_name_fn(|| {
187            use std::sync::atomic::{AtomicUsize, Ordering};
188            static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
189            let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
190            format!("tokio:work-{}", id)
191        })
192        .enable_all()
193        .build()
194        .unwrap();
195    if let Err(err) = runtime.block_on(run(args)) {
196        panic!("clusterd: fatal: {}", err.display_with_causes());
197    }
198}
199
200async fn run(args: Args) -> Result<(), anyhow::Error> {
201    let metrics_registry = MetricsRegistry::new();
202    let (tracing_handle, _tracing_guard) = args
203        .tracing
204        .configure_tracing(
205            StaticTracingConfig {
206                service_name: "clusterd",
207                build_info: BUILD_INFO,
208            },
209            metrics_registry.clone(),
210        )
211        .await?;
212
213    let tracing_handle = Arc::new(tracing_handle);
214    register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
215
216    // Keep this _after_ the mz_ore::tracing::configure call so that its panic
217    // hook runs _before_ the one that sends things to sentry.
218    mz_timely_util::panic::halt_on_timely_communication_panic();
219
220    let _failpoint_scenario = FailScenario::setup();
221
222    emit_boot_diagnostics!(&BUILD_INFO);
223
224    mz_alloc::register_metrics_into(&metrics_registry).await;
225    mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
226
227    if let Some(heap_limit) = args.heap_limit {
228        mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
229    } else {
230        info!("no heap limit announced; disabling memory limiter");
231    }
232
233    let secrets_reader = args
234        .secrets
235        .load()
236        .await
237        .context("loading secrets reader")?;
238
239    let usage_collector = Arc::new(usage_metrics::Collector {
240        disk_root: args.scratch_directory.clone(),
241    });
242
243    mz_ore::task::spawn(|| "clusterd_internal_http_server", {
244        let metrics_registry = metrics_registry.clone();
245        tracing::info!(
246            "serving internal HTTP server on {}",
247            args.internal_http_listen_addr
248        );
249        let listener = Listener::bind(args.internal_http_listen_addr).await?;
250        let mut make_service = mz_prof_http::router(&BUILD_INFO)
251            .route(
252                "/api/livez",
253                routing::get(mz_http_util::handle_liveness_check),
254            )
255            .route(
256                "/metrics",
257                routing::get(move || async move {
258                    mz_http_util::handle_prometheus(&metrics_registry).await
259                }),
260            )
261            .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
262            .route(
263                "/api/opentelemetry/config",
264                routing::put({
265                    move |_: axum::Json<DynamicFilterTarget>| async {
266                        (
267                            StatusCode::BAD_REQUEST,
268                            "This endpoint has been replaced. \
269                                Use the `opentelemetry_filter` system variable."
270                                .to_string(),
271                        )
272                    }
273                }),
274            )
275            .route(
276                "/api/stderr/config",
277                routing::put({
278                    move |_: axum::Json<DynamicFilterTarget>| async {
279                        (
280                            StatusCode::BAD_REQUEST,
281                            "This endpoint has been replaced. \
282                                Use the `log_filter` system variable."
283                                .to_string(),
284                        )
285                    }
286                }),
287            )
288            .route(
289                "/api/usage-metrics",
290                routing::get(async move || axum::Json(usage_collector.collect())),
291            )
292            .into_make_service();
293
294        // Once https://github.com/tokio-rs/axum/pull/2479 lands, this can become just a call to
295        // `axum::serve`.
296        async move {
297            loop {
298                let (conn, remote_addr) = match listener.accept().await {
299                    Ok(peer) => peer,
300                    Err(error) => {
301                        error!("internal_http connection failed: {error:#}");
302                        break;
303                    }
304                };
305
306                let tower_service = make_service.call(&conn).await.expect("infallible");
307                let hyper_service =
308                    hyper::service::service_fn(move |req| tower_service.clone().call(req));
309
310                mz_ore::task::spawn(
311                    || format!("clusterd_internal_http_server:{remote_addr}"),
312                    async move {
313                        if let Err(error) = hyper::server::conn::http1::Builder::new()
314                            .serve_connection(TokioIo::new(conn), hyper_service)
315                            .await
316                        {
317                            error!("failed to serve internal_http connection: {error:#}");
318                        }
319                    },
320                );
321            }
322        }
323    });
324
325    let pubsub_caller_id = std::env::var("HOSTNAME")
326        .ok()
327        .or_else(|| args.tracing.log_prefix.clone())
328        .unwrap_or_default();
329    let mut persist_cfg =
330        PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
331    persist_cfg.is_cc_active = args.is_cc;
332    persist_cfg.announce_memory_limit = args.announce_memory_limit;
333    // Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
334    persist_cfg.disable_compaction();
335
336    let persist_clients = Arc::new(PersistClientCache::new(
337        persist_cfg,
338        &metrics_registry,
339        |persist_cfg, metrics| {
340            let cfg = PersistPubSubClientConfig {
341                url: args.persist_pubsub_url,
342                caller_id: pubsub_caller_id,
343                persist_cfg: persist_cfg.clone(),
344            };
345            GrpcPubSubClient::connect(cfg, metrics)
346        },
347    ));
348    let txns_ctx = TxnsContext::default();
349
350    let connection_context = ConnectionContext::from_cli_args(
351        args.environment_id,
352        &args.tracing.startup_log_filter,
353        args.aws_external_id_prefix,
354        args.aws_connection_role_arn,
355        secrets_reader,
356        None,
357    );
358
359    let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
360    let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
361
362    let mut storage_timely_config = args.storage_timely_config;
363    storage_timely_config.process = args.process;
364    let mut compute_timely_config = args.compute_timely_config;
365    compute_timely_config.process = args.process;
366
367    // Start storage server.
368    let storage_client_builder = mz_storage::serve(
369        storage_timely_config,
370        &metrics_registry,
371        Arc::clone(&persist_clients),
372        txns_ctx.clone(),
373        Arc::clone(&tracing_handle),
374        SYSTEM_TIME.clone(),
375        connection_context.clone(),
376        StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
377    )
378    .await?;
379    info!(
380        "listening for storage controller connections on {}",
381        args.storage_controller_listen_addr
382    );
383    mz_ore::task::spawn(
384        || "storage_server",
385        transport::serve(
386            args.storage_controller_listen_addr,
387            BUILD_INFO.semver_version(),
388            grpc_host.clone(),
389            Duration::MAX,
390            storage_client_builder,
391            grpc_server_metrics.for_server("storage"),
392        ),
393    );
394
395    // Start compute server.
396    let compute_client_builder = mz_compute::server::serve(
397        compute_timely_config,
398        &metrics_registry,
399        persist_clients,
400        txns_ctx,
401        tracing_handle,
402        ComputeInstanceContext {
403            scratch_directory: args.scratch_directory,
404            worker_core_affinity: args.worker_core_affinity,
405            connection_context,
406        },
407    )
408    .await?;
409    info!(
410        "listening for compute controller connections on {}",
411        args.compute_controller_listen_addr
412    );
413    mz_ore::task::spawn(
414        || "compute_server",
415        transport::serve(
416            args.compute_controller_listen_addr,
417            BUILD_INFO.semver_version(),
418            grpc_host.clone(),
419            Duration::MAX,
420            compute_client_builder,
421            grpc_server_metrics.for_server("compute"),
422        ),
423    );
424
425    // TODO: unify storage and compute servers to use one timely cluster.
426
427    // Block forever.
428    future::pending().await
429}