mz_clusterd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use anyhow::Context;
16use axum::http::StatusCode;
17use axum::routing;
18use fail::FailScenario;
19use futures::future;
20use hyper_util::rt::TokioIo;
21use mz_build_info::{BuildInfo, build_info};
22use mz_cloud_resources::AwsExternalIdPrefix;
23use mz_cluster_client::client::TimelyConfig;
24use mz_compute::server::ComputeInstanceContext;
25use mz_compute_client::service::proto_compute_server::ProtoComputeServer;
26use mz_http_util::DynamicFilterTarget;
27use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
28use mz_ore::cli::{self, CliConfig};
29use mz_ore::error::ErrorExt;
30use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
31use mz_ore::netio::{Listener, SocketAddr};
32use mz_ore::now::SYSTEM_TIME;
33use mz_persist_client::cache::PersistClientCache;
34use mz_persist_client::cfg::PersistConfig;
35use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
36use mz_service::emit_boot_diagnostics;
37use mz_service::grpc::{GrpcServer, GrpcServerMetrics, MAX_GRPC_MESSAGE_SIZE};
38use mz_service::secrets::SecretsReaderCliArgs;
39use mz_service::transport;
40use mz_storage::storage_state::StorageInstanceContext;
41use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
42use mz_storage_types::connections::ConnectionContext;
43use mz_txn_wal::operator::TxnsContext;
44use tokio::runtime::Handle;
45use tower::Service;
46use tracing::{error, info, warn};
47
48mod usage_metrics;
49
50const BUILD_INFO: BuildInfo = build_info!();
51
52pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
53
54/// Independent cluster server for Materialize.
55#[derive(clap::Parser)]
56#[clap(name = "clusterd", version = VERSION.as_str())]
57struct Args {
58    // === Connection options. ===
59    /// The address on which to listen for a connection from the storage
60    /// controller.
61    #[clap(
62        long,
63        env = "STORAGE_CONTROLLER_LISTEN_ADDR",
64        value_name = "HOST:PORT",
65        default_value = "127.0.0.1:2100"
66    )]
67    storage_controller_listen_addr: SocketAddr,
68    /// The address on which to listen for a connection from the compute
69    /// controller.
70    #[clap(
71        long,
72        env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
73        value_name = "HOST:PORT",
74        default_value = "127.0.0.1:2101"
75    )]
76    compute_controller_listen_addr: SocketAddr,
77    /// The address of the internal HTTP server.
78    #[clap(
79        long,
80        env = "INTERNAL_HTTP_LISTEN_ADDR",
81        value_name = "HOST:PORT",
82        default_value = "127.0.0.1:6878"
83    )]
84    internal_http_listen_addr: SocketAddr,
85    /// The FQDN of this process, for GRPC request validation.
86    ///
87    /// Not providing this value or setting it to the empty string disables host validation for
88    /// GRPC requests.
89    #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
90    grpc_host: Option<String>,
91    /// Whether to use CTP for controller connections.
92    #[clap(long, env = "USE_CTP")]
93    use_ctp: bool,
94
95    // === Timely cluster options. ===
96    /// Configuration for the storage Timely cluster.
97    #[clap(long, env = "STORAGE_TIMELY_CONFIG")]
98    storage_timely_config: TimelyConfig,
99    /// Configuration for the compute Timely cluster.
100    #[clap(long, env = "COMPUTE_TIMELY_CONFIG")]
101    compute_timely_config: TimelyConfig,
102    /// The index of the process in both Timely clusters.
103    #[clap(long, env = "PROCESS")]
104    process: usize,
105
106    // === Storage options. ===
107    /// The URL for the Persist PubSub service.
108    #[clap(
109        long,
110        env = "PERSIST_PUBSUB_URL",
111        value_name = "http://HOST:PORT",
112        default_value = "http://localhost:6879"
113    )]
114    persist_pubsub_url: String,
115
116    // === Cloud options. ===
117    /// An external ID to be supplied to all AWS AssumeRole operations.
118    ///
119    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
120    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
121    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
122
123    /// The ARN for a Materialize-controlled role to assume before assuming
124    /// a customer's requested role for an AWS connection.
125    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
126    aws_connection_role_arn: Option<String>,
127
128    // === Secrets reader options. ===
129    #[clap(flatten)]
130    secrets: SecretsReaderCliArgs,
131
132    // === Tracing options. ===
133    #[clap(flatten)]
134    tracing: TracingCliArgs,
135
136    // === Other options. ===
137    /// An opaque identifier for the environment in which this process is
138    /// running.
139    #[clap(long, env = "ENVIRONMENT_ID")]
140    environment_id: String,
141
142    /// A scratch directory that can be used for ephemeral storage.
143    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
144    scratch_directory: Option<PathBuf>,
145
146    /// Optional memory limit (bytes) of the cluster replica
147    #[clap(long)]
148    announce_memory_limit: Option<usize>,
149
150    /// Whether this size represents a modern "cc" size rather than a legacy
151    /// T-shirt size.
152    #[clap(long)]
153    is_cc: bool,
154
155    /// Set core affinity for Timely workers.
156    ///
157    /// This flag should only be set if the process is provided with exclusive access to its
158    /// supplied CPU cores. If other processes are competing over the same cores, setting core
159    /// affinity might degrade dataflow performance rather than improving it.
160    #[clap(long)]
161    worker_core_affinity: bool,
162}
163
164pub fn main() {
165    mz_ore::panic::install_enhanced_handler();
166
167    let args = cli::parse_args(CliConfig {
168        env_prefix: Some("CLUSTERD_"),
169        enable_version_flag: true,
170    });
171
172    let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
173    let runtime = tokio::runtime::Builder::new_multi_thread()
174        .worker_threads(ncpus_useful)
175        // The default thread name exceeds the Linux limit on thread name
176        // length, so pick something shorter. The maximum length is 16 including
177        // a \0 terminator. This gives us four decimals, which should be enough
178        // for most existing computers.
179        .thread_name_fn(|| {
180            use std::sync::atomic::{AtomicUsize, Ordering};
181            static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
182            let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
183            format!("tokio:work-{}", id)
184        })
185        .enable_all()
186        .build()
187        .unwrap();
188    if let Err(err) = runtime.block_on(run(args)) {
189        panic!("clusterd: fatal: {}", err.display_with_causes());
190    }
191}
192
193async fn run(args: Args) -> Result<(), anyhow::Error> {
194    let metrics_registry = MetricsRegistry::new();
195    let (tracing_handle, _tracing_guard) = args
196        .tracing
197        .configure_tracing(
198            StaticTracingConfig {
199                service_name: "clusterd",
200                build_info: BUILD_INFO,
201            },
202            metrics_registry.clone(),
203        )
204        .await?;
205
206    let tracing_handle = Arc::new(tracing_handle);
207    register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
208
209    // Keep this _after_ the mz_ore::tracing::configure call so that its panic
210    // hook runs _before_ the one that sends things to sentry.
211    mz_timely_util::panic::halt_on_timely_communication_panic();
212
213    let _failpoint_scenario = FailScenario::setup();
214
215    emit_boot_diagnostics!(&BUILD_INFO);
216
217    mz_alloc::register_metrics_into(&metrics_registry).await;
218    mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
219
220    if let Some(memory_limit) = args.announce_memory_limit {
221        mz_compute::lgalloc::start_limiter(memory_limit, &metrics_registry);
222        mz_compute::memory_limiter::start_limiter(memory_limit, &metrics_registry);
223    } else {
224        warn!("no memory limit announced; disabling lgalloc limiter");
225    }
226
227    let secrets_reader = args
228        .secrets
229        .load()
230        .await
231        .context("loading secrets reader")?;
232
233    let usage_collector = Arc::new(usage_metrics::Collector {
234        disk_root: args.scratch_directory.clone(),
235    });
236
237    mz_ore::task::spawn(|| "clusterd_internal_http_server", {
238        let metrics_registry = metrics_registry.clone();
239        tracing::info!(
240            "serving internal HTTP server on {}",
241            args.internal_http_listen_addr
242        );
243        let listener = Listener::bind(args.internal_http_listen_addr).await?;
244        let mut make_service = mz_prof_http::router(&BUILD_INFO)
245            .route(
246                "/api/livez",
247                routing::get(mz_http_util::handle_liveness_check),
248            )
249            .route(
250                "/metrics",
251                routing::get(move || async move {
252                    mz_http_util::handle_prometheus(&metrics_registry).await
253                }),
254            )
255            .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
256            .route(
257                "/api/opentelemetry/config",
258                routing::put({
259                    move |_: axum::Json<DynamicFilterTarget>| async {
260                        (
261                            StatusCode::BAD_REQUEST,
262                            "This endpoint has been replaced. \
263                                Use the `opentelemetry_filter` system variable."
264                                .to_string(),
265                        )
266                    }
267                }),
268            )
269            .route(
270                "/api/stderr/config",
271                routing::put({
272                    move |_: axum::Json<DynamicFilterTarget>| async {
273                        (
274                            StatusCode::BAD_REQUEST,
275                            "This endpoint has been replaced. \
276                                Use the `log_filter` system variable."
277                                .to_string(),
278                        )
279                    }
280                }),
281            )
282            .route(
283                "/api/usage-metrics",
284                routing::get(async move || axum::Json(usage_collector.collect())),
285            )
286            .into_make_service();
287
288        // Once https://github.com/tokio-rs/axum/pull/2479 lands, this can become just a call to
289        // `axum::serve`.
290        async move {
291            loop {
292                let (conn, remote_addr) = match listener.accept().await {
293                    Ok(peer) => peer,
294                    Err(error) => {
295                        error!("internal_http connection failed: {error:#}");
296                        break;
297                    }
298                };
299
300                let tower_service = make_service.call(&conn).await.expect("infallible");
301                let hyper_service =
302                    hyper::service::service_fn(move |req| tower_service.clone().call(req));
303
304                mz_ore::task::spawn(
305                    || format!("clusterd_internal_http_server:{remote_addr}"),
306                    async move {
307                        if let Err(error) = hyper::server::conn::http1::Builder::new()
308                            .serve_connection(TokioIo::new(conn), hyper_service)
309                            .await
310                        {
311                            error!("failed to serve internal_http connection: {error:#}");
312                        }
313                    },
314                );
315            }
316        }
317    });
318
319    let pubsub_caller_id = std::env::var("HOSTNAME")
320        .ok()
321        .or_else(|| args.tracing.log_prefix.clone())
322        .unwrap_or_default();
323    let mut persist_cfg =
324        PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
325    persist_cfg.is_cc_active = args.is_cc;
326    persist_cfg.announce_memory_limit = args.announce_memory_limit;
327    // Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
328    persist_cfg.disable_compaction();
329
330    let persist_clients = Arc::new(PersistClientCache::new(
331        persist_cfg,
332        &metrics_registry,
333        |persist_cfg, metrics| {
334            let cfg = PersistPubSubClientConfig {
335                url: args.persist_pubsub_url,
336                caller_id: pubsub_caller_id,
337                persist_cfg: persist_cfg.clone(),
338            };
339            GrpcPubSubClient::connect(cfg, metrics)
340        },
341    ));
342    let txns_ctx = TxnsContext::default();
343
344    let connection_context = ConnectionContext::from_cli_args(
345        args.environment_id,
346        &args.tracing.startup_log_filter,
347        args.aws_external_id_prefix,
348        args.aws_connection_role_arn,
349        secrets_reader,
350        None,
351    );
352
353    let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
354    let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
355
356    let mut storage_timely_config = args.storage_timely_config;
357    storage_timely_config.process = args.process;
358    let mut compute_timely_config = args.compute_timely_config;
359    compute_timely_config.process = args.process;
360
361    // Start storage server.
362    let storage_client_builder = mz_storage::serve(
363        storage_timely_config,
364        &metrics_registry,
365        Arc::clone(&persist_clients),
366        txns_ctx.clone(),
367        Arc::clone(&tracing_handle),
368        SYSTEM_TIME.clone(),
369        connection_context.clone(),
370        StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
371    )
372    .await?;
373    info!(
374        "listening for storage controller connections on {}",
375        args.storage_controller_listen_addr
376    );
377    let storage_serve = if args.use_ctp {
378        future::Either::Left(transport::serve(
379            args.storage_controller_listen_addr,
380            BUILD_INFO.semver_version(),
381            grpc_host.clone(),
382            Duration::MAX,
383            storage_client_builder,
384            grpc_server_metrics.for_server("storage"),
385        ))
386    } else {
387        future::Either::Right(GrpcServer::serve(
388            &grpc_server_metrics,
389            args.storage_controller_listen_addr,
390            BUILD_INFO.semver_version(),
391            grpc_host.clone(),
392            storage_client_builder,
393            |svc| ProtoStorageServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
394        ))
395    };
396    mz_ore::task::spawn(|| "storage_server", storage_serve);
397
398    // Start compute server.
399    let compute_client_builder = mz_compute::server::serve(
400        compute_timely_config,
401        &metrics_registry,
402        persist_clients,
403        txns_ctx,
404        tracing_handle,
405        ComputeInstanceContext {
406            scratch_directory: args.scratch_directory,
407            worker_core_affinity: args.worker_core_affinity,
408            connection_context,
409        },
410    )
411    .await?;
412    info!(
413        "listening for compute controller connections on {}",
414        args.compute_controller_listen_addr
415    );
416    let compute_serve = if args.use_ctp {
417        future::Either::Left(transport::serve(
418            args.compute_controller_listen_addr,
419            BUILD_INFO.semver_version(),
420            grpc_host.clone(),
421            Duration::MAX,
422            compute_client_builder,
423            grpc_server_metrics.for_server("compute"),
424        ))
425    } else {
426        future::Either::Right(GrpcServer::serve(
427            &grpc_server_metrics,
428            args.compute_controller_listen_addr,
429            BUILD_INFO.semver_version(),
430            grpc_host,
431            compute_client_builder,
432            |svc| ProtoComputeServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
433        ))
434    };
435    mz_ore::task::spawn(|| "compute_server", compute_serve);
436
437    // TODO: unify storage and compute servers to use one timely cluster.
438
439    // Block forever.
440    future::pending().await
441}