mz_clusterd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_http_util::DynamicFilterTarget;
26use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
27use mz_ore::cli::{self, CliConfig};
28use mz_ore::error::ErrorExt;
29use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
30use mz_ore::netio::{Listener, SocketAddr};
31use mz_ore::now::SYSTEM_TIME;
32use mz_persist_client::cache::PersistClientCache;
33use mz_persist_client::cfg::PersistConfig;
34use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
35use mz_service::emit_boot_diagnostics;
36use mz_service::grpc::GrpcServerMetrics;
37use mz_service::secrets::SecretsReaderCliArgs;
38use mz_service::transport;
39use mz_storage::storage_state::StorageInstanceContext;
40use mz_storage_types::connections::ConnectionContext;
41use mz_txn_wal::operator::TxnsContext;
42use tokio::runtime::Handle;
43use tower::Service;
44use tracing::{error, info};
45
46mod usage_metrics;
47
48const BUILD_INFO: BuildInfo = build_info!();
49
50pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
51
52/// Independent cluster server for Materialize.
53#[derive(clap::Parser)]
54#[clap(name = "clusterd", version = VERSION.as_str())]
55struct Args {
56    // === Connection options. ===
57    /// The address on which to listen for a connection from the storage
58    /// controller.
59    #[clap(
60        long,
61        env = "STORAGE_CONTROLLER_LISTEN_ADDR",
62        value_name = "HOST:PORT",
63        default_value = "127.0.0.1:2100"
64    )]
65    storage_controller_listen_addr: SocketAddr,
66    /// The address on which to listen for a connection from the compute
67    /// controller.
68    #[clap(
69        long,
70        env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
71        value_name = "HOST:PORT",
72        default_value = "127.0.0.1:2101"
73    )]
74    compute_controller_listen_addr: SocketAddr,
75    /// The address of the internal HTTP server.
76    #[clap(
77        long,
78        env = "INTERNAL_HTTP_LISTEN_ADDR",
79        value_name = "HOST:PORT",
80        default_value = "127.0.0.1:6878"
81    )]
82    internal_http_listen_addr: SocketAddr,
83    /// The FQDN of this process, for GRPC request validation.
84    ///
85    /// Not providing this value or setting it to the empty string disables host validation for
86    /// GRPC requests.
87    #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
88    grpc_host: Option<String>,
89
90    // === Timely cluster options. ===
91    /// Configuration for the storage Timely cluster.
92    #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
93    storage_timely_config: TimelyConfig,
94    /// Configuration for the compute Timely cluster.
95    #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
96    compute_timely_config: TimelyConfig,
97    /// The index of the process in both Timely clusters.
98    #[clap(long, env = "PROCESS")]
99    process: usize,
100
101    // === Storage options. ===
102    /// The URL for the Persist PubSub service.
103    #[clap(
104        long,
105        env = "PERSIST_PUBSUB_URL",
106        value_name = "http://HOST:PORT",
107        default_value = "http://localhost:6879"
108    )]
109    persist_pubsub_url: String,
110
111    // === Cloud options. ===
112    /// An external ID to be supplied to all AWS AssumeRole operations.
113    ///
114    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
115    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
116    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
117
118    /// The ARN for a Materialize-controlled role to assume before assuming
119    /// a customer's requested role for an AWS connection.
120    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
121    aws_connection_role_arn: Option<String>,
122
123    // === Secrets reader options. ===
124    #[clap(flatten)]
125    secrets: SecretsReaderCliArgs,
126
127    // === Tracing options. ===
128    #[clap(flatten)]
129    tracing: TracingCliArgs,
130
131    // === Other options. ===
132    /// An opaque identifier for the environment in which this process is
133    /// running.
134    #[clap(long, env = "ENVIRONMENT_ID")]
135    environment_id: String,
136
137    /// A scratch directory that can be used for ephemeral storage.
138    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
139    scratch_directory: Option<PathBuf>,
140
141    /// Memory limit (bytes) of the cluster replica, if known.
142    ///
143    /// The limit is expected to be enforced by the orchestrator. The clusterd process only uses it
144    /// to inform configuration of backpressure mechanism.
145    #[clap(long)]
146    announce_memory_limit: Option<usize>,
147
148    /// Heap limit (bytes) of the cluster replica.
149    ///
150    /// A process heap usage is calculated as the sum of its memory and swap usage.
151    ///
152    /// In contrast to `announce_memory_limit`, this limit is enforced by the clusterd process. If
153    /// the limit is exceeded, the process terminates itself with a 167 exit code.
154    #[clap(long)]
155    heap_limit: Option<usize>,
156
157    /// Whether this size represents a modern "cc" size rather than a legacy
158    /// T-shirt size.
159    #[clap(long)]
160    is_cc: bool,
161
162    /// Set core affinity for Timely workers.
163    ///
164    /// This flag should only be set if the process is provided with exclusive access to its
165    /// supplied CPU cores. If other processes are competing over the same cores, setting core
166    /// affinity might degrade dataflow performance rather than improving it.
167    #[clap(long)]
168    worker_core_affinity: bool,
169}
170
171pub fn main() {
172    mz_ore::panic::install_enhanced_handler();
173
174    let args = cli::parse_args(CliConfig {
175        env_prefix: Some("CLUSTERD_"),
176        enable_version_flag: true,
177    });
178
179    let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
180    let runtime = tokio::runtime::Builder::new_multi_thread()
181        .worker_threads(ncpus_useful)
182        .thread_stack_size(3 * 1024 * 1024) // 3 MiB
183        // The default thread name exceeds the Linux limit on thread name
184        // length, so pick something shorter. The maximum length is 16 including
185        // a \0 terminator. This gives us four decimals, which should be enough
186        // for most existing computers.
187        .thread_name_fn(|| {
188            use std::sync::atomic::{AtomicUsize, Ordering};
189            static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
190            let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
191            format!("tokio:work-{}", id)
192        })
193        .enable_all()
194        .build()
195        .unwrap();
196    if let Err(err) = runtime.block_on(run(args)) {
197        panic!("clusterd: fatal: {}", err.display_with_causes());
198    }
199}
200
201async fn run(args: Args) -> Result<(), anyhow::Error> {
202    let metrics_registry = MetricsRegistry::new();
203    let (tracing_handle, _tracing_guard) = args
204        .tracing
205        .configure_tracing(
206            StaticTracingConfig {
207                service_name: "clusterd",
208                build_info: BUILD_INFO,
209            },
210            metrics_registry.clone(),
211        )
212        .await?;
213
214    let tracing_handle = Arc::new(tracing_handle);
215    register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
216
217    // Keep this _after_ the mz_ore::tracing::configure call so that its panic
218    // hook runs _before_ the one that sends things to sentry.
219    mz_timely_util::panic::halt_on_timely_communication_panic();
220
221    let _failpoint_scenario = FailScenario::setup();
222
223    emit_boot_diagnostics!(&BUILD_INFO);
224
225    mz_alloc::register_metrics_into(&metrics_registry).await;
226    mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
227
228    if let Some(heap_limit) = args.heap_limit {
229        mz_compute::memory_limiter::start_limiter(heap_limit, &metrics_registry);
230    } else {
231        info!("no heap limit announced; disabling memory limiter");
232    }
233
234    let secrets_reader = args
235        .secrets
236        .load()
237        .await
238        .context("loading secrets reader")?;
239
240    let usage_collector = Arc::new(usage_metrics::Collector {
241        disk_root: args.scratch_directory.clone(),
242    });
243
244    mz_ore::task::spawn(|| "clusterd_internal_http_server", {
245        let metrics_registry = metrics_registry.clone();
246        tracing::info!(
247            "serving internal HTTP server on {}",
248            args.internal_http_listen_addr
249        );
250        let listener = Listener::bind(args.internal_http_listen_addr).await?;
251        let mut make_service = mz_prof_http::router(&BUILD_INFO)
252            .route(
253                "/api/livez",
254                routing::get(mz_http_util::handle_liveness_check),
255            )
256            .route(
257                "/metrics",
258                routing::get(move || async move {
259                    mz_http_util::handle_prometheus(&metrics_registry).await
260                }),
261            )
262            .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
263            .route(
264                "/api/opentelemetry/config",
265                routing::put({
266                    move |_: axum::Json<DynamicFilterTarget>| async {
267                        (
268                            StatusCode::BAD_REQUEST,
269                            "This endpoint has been replaced. \
270                                Use the `opentelemetry_filter` system variable."
271                                .to_string(),
272                        )
273                    }
274                }),
275            )
276            .route(
277                "/api/stderr/config",
278                routing::put({
279                    move |_: axum::Json<DynamicFilterTarget>| async {
280                        (
281                            StatusCode::BAD_REQUEST,
282                            "This endpoint has been replaced. \
283                                Use the `log_filter` system variable."
284                                .to_string(),
285                        )
286                    }
287                }),
288            )
289            .route(
290                "/api/usage-metrics",
291                routing::get(async move || axum::Json(usage_collector.collect())),
292            )
293            .into_make_service();
294
295        // Once https://github.com/tokio-rs/axum/pull/2479 lands, this can become just a call to
296        // `axum::serve`.
297        async move {
298            loop {
299                let (conn, remote_addr) = match listener.accept().await {
300                    Ok(peer) => peer,
301                    Err(error) => {
302                        error!("internal_http connection failed: {error:#}");
303                        break;
304                    }
305                };
306
307                let tower_service = make_service.call(&conn).await.expect("infallible");
308                let hyper_service =
309                    hyper::service::service_fn(move |req| tower_service.clone().call(req));
310
311                mz_ore::task::spawn(
312                    || format!("clusterd_internal_http_server:{remote_addr}"),
313                    async move {
314                        if let Err(error) = hyper::server::conn::http1::Builder::new()
315                            .serve_connection(TokioIo::new(conn), hyper_service)
316                            .await
317                        {
318                            error!("failed to serve internal_http connection: {error:#}");
319                        }
320                    },
321                );
322            }
323        }
324    });
325
326    let pubsub_caller_id = std::env::var("HOSTNAME")
327        .ok()
328        .or_else(|| args.tracing.log_prefix.clone())
329        .unwrap_or_default();
330    let mut persist_cfg =
331        PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
332    persist_cfg.is_cc_active = args.is_cc;
333    persist_cfg.announce_memory_limit = args.announce_memory_limit;
334    // Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
335    persist_cfg.disable_compaction();
336
337    let persist_clients = Arc::new(PersistClientCache::new(
338        persist_cfg,
339        &metrics_registry,
340        |persist_cfg, metrics| {
341            let cfg = PersistPubSubClientConfig {
342                url: args.persist_pubsub_url,
343                caller_id: pubsub_caller_id,
344                persist_cfg: persist_cfg.clone(),
345            };
346            GrpcPubSubClient::connect(cfg, metrics)
347        },
348    ));
349    let txns_ctx = TxnsContext::default();
350
351    let connection_context = ConnectionContext::from_cli_args(
352        args.environment_id,
353        &args.tracing.startup_log_filter,
354        args.aws_external_id_prefix,
355        args.aws_connection_role_arn,
356        secrets_reader,
357        None,
358    );
359
360    let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
361    let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
362
363    let mut storage_timely_config = args.storage_timely_config;
364    storage_timely_config.process = args.process;
365    let mut compute_timely_config = args.compute_timely_config;
366    compute_timely_config.process = args.process;
367
368    // Start storage server.
369    let storage_client_builder = mz_storage::serve(
370        storage_timely_config,
371        &metrics_registry,
372        Arc::clone(&persist_clients),
373        txns_ctx.clone(),
374        Arc::clone(&tracing_handle),
375        SYSTEM_TIME.clone(),
376        connection_context.clone(),
377        StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
378    )
379    .await?;
380    info!(
381        "listening for storage controller connections on {}",
382        args.storage_controller_listen_addr
383    );
384    mz_ore::task::spawn(
385        || "storage_server",
386        transport::serve(
387            args.storage_controller_listen_addr,
388            BUILD_INFO.semver_version(),
389            grpc_host.clone(),
390            Duration::MAX,
391            storage_client_builder,
392            grpc_server_metrics.for_server("storage"),
393        ),
394    );
395
396    // Start compute server.
397    let compute_client_builder = mz_compute::server::serve(
398        compute_timely_config,
399        &metrics_registry,
400        persist_clients,
401        txns_ctx,
402        tracing_handle,
403        ComputeInstanceContext {
404            scratch_directory: args.scratch_directory,
405            worker_core_affinity: args.worker_core_affinity,
406            connection_context,
407        },
408    )
409    .await?;
410    info!(
411        "listening for compute controller connections on {}",
412        args.compute_controller_listen_addr
413    );
414    mz_ore::task::spawn(
415        || "compute_server",
416        transport::serve(
417            args.compute_controller_listen_addr,
418            BUILD_INFO.semver_version(),
419            grpc_host.clone(),
420            Duration::MAX,
421            compute_client_builder,
422            grpc_server_metrics.for_server("compute"),
423        ),
424    );
425
426    // TODO: unify storage and compute servers to use one timely cluster.
427
428    // Block forever.
429    future::pending().await
430}