mz_clusterd/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::path::PathBuf;
11use std::sync::Arc;
12use std::sync::LazyLock;
13
14use anyhow::Context;
15use axum::http::StatusCode;
16use axum::routing;
17use fail::FailScenario;
18use futures::future;
19use hyper_util::rt::TokioIo;
20use mz_build_info::{BuildInfo, build_info};
21use mz_cloud_resources::AwsExternalIdPrefix;
22use mz_compute::server::ComputeInstanceContext;
23use mz_compute_client::service::proto_compute_server::ProtoComputeServer;
24use mz_http_util::DynamicFilterTarget;
25use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
26use mz_ore::cli::{self, CliConfig};
27use mz_ore::error::ErrorExt;
28use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
29use mz_ore::netio::{Listener, SocketAddr};
30use mz_ore::now::SYSTEM_TIME;
31use mz_persist_client::cache::PersistClientCache;
32use mz_persist_client::cfg::PersistConfig;
33use mz_persist_client::rpc::{GrpcPubSubClient, PersistPubSubClient, PersistPubSubClientConfig};
34use mz_service::emit_boot_diagnostics;
35use mz_service::grpc::{GrpcServer, GrpcServerMetrics, MAX_GRPC_MESSAGE_SIZE};
36use mz_service::secrets::SecretsReaderCliArgs;
37use mz_storage::storage_state::StorageInstanceContext;
38use mz_storage_client::client::proto_storage_server::ProtoStorageServer;
39use mz_storage_types::connections::ConnectionContext;
40use mz_txn_wal::operator::TxnsContext;
41use tokio::runtime::Handle;
42use tower::Service;
43use tracing::{error, info};
44
45const BUILD_INFO: BuildInfo = build_info!();
46
47pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
48
49/// Independent cluster server for Materialize.
50#[derive(clap::Parser)]
51#[clap(name = "clusterd", version = VERSION.as_str())]
52struct Args {
53    // === Connection options. ===
54    /// The address on which to listen for a connection from the storage
55    /// controller.
56    #[clap(
57        long,
58        env = "STORAGE_CONTROLLER_LISTEN_ADDR",
59        value_name = "HOST:PORT",
60        default_value = "127.0.0.1:2100"
61    )]
62    storage_controller_listen_addr: SocketAddr,
63    /// The address on which to listen for a connection from the compute
64    /// controller.
65    #[clap(
66        long,
67        env = "COMPUTE_CONTROLLER_LISTEN_ADDR",
68        value_name = "HOST:PORT",
69        default_value = "127.0.0.1:2101"
70    )]
71    compute_controller_listen_addr: SocketAddr,
72    /// The address of the internal HTTP server.
73    #[clap(
74        long,
75        env = "INTERNAL_HTTP_LISTEN_ADDR",
76        value_name = "HOST:PORT",
77        default_value = "127.0.0.1:6878"
78    )]
79    internal_http_listen_addr: SocketAddr,
80    /// The FQDN of this process, for GRPC request validation.
81    ///
82    /// Not providing this value or setting it to the empty string disables host validation for
83    /// GRPC requests.
84    #[clap(long, env = "GRPC_HOST", value_name = "NAME")]
85    grpc_host: Option<String>,
86
87    // === Storage options. ===
88    /// The URL for the Persist PubSub service.
89    #[clap(
90        long,
91        env = "PERSIST_PUBSUB_URL",
92        value_name = "http://HOST:PORT",
93        default_value = "http://localhost:6879"
94    )]
95    persist_pubsub_url: String,
96
97    // === Cloud options. ===
98    /// An external ID to be supplied to all AWS AssumeRole operations.
99    ///
100    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
101    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
102    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
103
104    /// The ARN for a Materialize-controlled role to assume before assuming
105    /// a customer's requested role for an AWS connection.
106    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
107    aws_connection_role_arn: Option<String>,
108
109    // === Secrets reader options. ===
110    #[clap(flatten)]
111    secrets: SecretsReaderCliArgs,
112
113    // === Tracing options. ===
114    #[clap(flatten)]
115    tracing: TracingCliArgs,
116
117    // === Other options. ===
118    /// An opaque identifier for the environment in which this process is
119    /// running.
120    #[clap(long, env = "ENVIRONMENT_ID")]
121    environment_id: String,
122
123    /// A scratch directory that can be used for ephemeral storage.
124    #[clap(long, env = "SCRATCH_DIRECTORY", value_name = "PATH")]
125    scratch_directory: Option<PathBuf>,
126
127    /// Optional memory limit (bytes) of the cluster replica
128    #[clap(long)]
129    announce_memory_limit: Option<usize>,
130
131    /// Whether this size represents a modern "cc" size rather than a legacy
132    /// T-shirt size.
133    #[clap(long)]
134    is_cc: bool,
135
136    /// Set core affinity for Timely workers.
137    ///
138    /// This flag should only be set if the process is provided with exclusive access to its
139    /// supplied CPU cores. If other processes are competing over the same cores, setting core
140    /// affinity might degrade dataflow performance rather than improving it.
141    #[clap(long)]
142    worker_core_affinity: bool,
143}
144
145pub fn main() {
146    mz_ore::panic::install_enhanced_handler();
147
148    let args = cli::parse_args(CliConfig {
149        env_prefix: Some("CLUSTERD_"),
150        enable_version_flag: true,
151    });
152
153    let ncpus_useful = usize::max(1, std::cmp::min(num_cpus::get(), num_cpus::get_physical()));
154    let runtime = tokio::runtime::Builder::new_multi_thread()
155        .worker_threads(ncpus_useful)
156        // The default thread name exceeds the Linux limit on thread name
157        // length, so pick something shorter. The maximum length is 16 including
158        // a \0 terminator. This gives us four decimals, which should be enough
159        // for most existing computers.
160        .thread_name_fn(|| {
161            use std::sync::atomic::{AtomicUsize, Ordering};
162            static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
163            let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
164            format!("tokio:work-{}", id)
165        })
166        .enable_all()
167        .build()
168        .unwrap();
169    if let Err(err) = runtime.block_on(run(args)) {
170        panic!("clusterd: fatal: {}", err.display_with_causes());
171    }
172}
173
174async fn run(args: Args) -> Result<(), anyhow::Error> {
175    let metrics_registry = MetricsRegistry::new();
176    let (tracing_handle, _tracing_guard) = args
177        .tracing
178        .configure_tracing(
179            StaticTracingConfig {
180                service_name: "clusterd",
181                build_info: BUILD_INFO,
182            },
183            metrics_registry.clone(),
184        )
185        .await?;
186
187    let tracing_handle = Arc::new(tracing_handle);
188    register_runtime_metrics("main", Handle::current().metrics(), &metrics_registry);
189
190    // Keep this _after_ the mz_ore::tracing::configure call so that its panic
191    // hook runs _before_ the one that sends things to sentry.
192    mz_timely_util::panic::halt_on_timely_communication_panic();
193
194    let _failpoint_scenario = FailScenario::setup();
195
196    emit_boot_diagnostics!(&BUILD_INFO);
197
198    mz_alloc::register_metrics_into(&metrics_registry).await;
199    mz_metrics::register_metrics_into(&metrics_registry, mz_dyncfgs::all_dyncfgs()).await;
200
201    let secrets_reader = args
202        .secrets
203        .load()
204        .await
205        .context("loading secrets reader")?;
206
207    mz_ore::task::spawn(|| "clusterd_internal_http_server", {
208        let metrics_registry = metrics_registry.clone();
209        tracing::info!(
210            "serving internal HTTP server on {}",
211            args.internal_http_listen_addr
212        );
213        let listener = Listener::bind(args.internal_http_listen_addr).await?;
214        let mut make_service = mz_prof_http::router(&BUILD_INFO)
215            .route(
216                "/api/livez",
217                routing::get(mz_http_util::handle_liveness_check),
218            )
219            .route(
220                "/metrics",
221                routing::get(move || async move {
222                    mz_http_util::handle_prometheus(&metrics_registry).await
223                }),
224            )
225            .route("/api/tracing", routing::get(mz_http_util::handle_tracing))
226            .route(
227                "/api/opentelemetry/config",
228                routing::put({
229                    move |_: axum::Json<DynamicFilterTarget>| async {
230                        (
231                            StatusCode::BAD_REQUEST,
232                            "This endpoint has been replaced. \
233                                Use the `opentelemetry_filter` system variable."
234                                .to_string(),
235                        )
236                    }
237                }),
238            )
239            .route(
240                "/api/stderr/config",
241                routing::put({
242                    move |_: axum::Json<DynamicFilterTarget>| async {
243                        (
244                            StatusCode::BAD_REQUEST,
245                            "This endpoint has been replaced. \
246                                Use the `log_filter` system variable."
247                                .to_string(),
248                        )
249                    }
250                }),
251            )
252            .into_make_service();
253
254        // Once https://github.com/tokio-rs/axum/pull/2479 lands, this can become just a call to
255        // `axum::serve`.
256        async move {
257            loop {
258                let (conn, remote_addr) = match listener.accept().await {
259                    Ok(peer) => peer,
260                    Err(error) => {
261                        error!("internal_http connection failed: {error:#}");
262                        break;
263                    }
264                };
265
266                let tower_service = make_service.call(&conn).await.expect("infallible");
267                let hyper_service =
268                    hyper::service::service_fn(move |req| tower_service.clone().call(req));
269
270                mz_ore::task::spawn(
271                    || format!("clusterd_internal_http_server:{remote_addr}"),
272                    async move {
273                        if let Err(error) = hyper::server::conn::http1::Builder::new()
274                            .serve_connection(TokioIo::new(conn), hyper_service)
275                            .await
276                        {
277                            error!("failed to serve internal_http connection: {error:#}");
278                        }
279                    },
280                );
281            }
282        }
283    });
284
285    let pubsub_caller_id = std::env::var("HOSTNAME")
286        .ok()
287        .or_else(|| args.tracing.log_prefix.clone())
288        .unwrap_or_default();
289    let mut persist_cfg =
290        PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
291    persist_cfg.is_cc_active = args.is_cc;
292    persist_cfg.announce_memory_limit = args.announce_memory_limit;
293    // Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
294    persist_cfg.disable_compaction();
295
296    let persist_clients = Arc::new(PersistClientCache::new(
297        persist_cfg,
298        &metrics_registry,
299        |persist_cfg, metrics| {
300            let cfg = PersistPubSubClientConfig {
301                url: args.persist_pubsub_url,
302                caller_id: pubsub_caller_id,
303                persist_cfg: persist_cfg.clone(),
304            };
305            GrpcPubSubClient::connect(cfg, metrics)
306        },
307    ));
308    let txns_ctx = TxnsContext::default();
309
310    let connection_context = ConnectionContext::from_cli_args(
311        args.environment_id,
312        &args.tracing.startup_log_filter,
313        args.aws_external_id_prefix,
314        args.aws_connection_role_arn,
315        secrets_reader,
316        None,
317    );
318
319    let grpc_host = args.grpc_host.and_then(|h| (!h.is_empty()).then_some(h));
320    let grpc_server_metrics = GrpcServerMetrics::register_with(&metrics_registry);
321
322    // Start storage server.
323    let storage_client_builder = mz_storage::serve(
324        &metrics_registry,
325        Arc::clone(&persist_clients),
326        txns_ctx.clone(),
327        Arc::clone(&tracing_handle),
328        SYSTEM_TIME.clone(),
329        connection_context.clone(),
330        StorageInstanceContext::new(args.scratch_directory.clone(), args.announce_memory_limit)?,
331    )?;
332    info!(
333        "listening for storage controller connections on {}",
334        args.storage_controller_listen_addr
335    );
336    mz_ore::task::spawn(
337        || "storage_server",
338        GrpcServer::serve(
339            &grpc_server_metrics,
340            args.storage_controller_listen_addr,
341            BUILD_INFO.semver_version(),
342            grpc_host.clone(),
343            storage_client_builder,
344            |svc| ProtoStorageServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
345        ),
346    );
347
348    // Start compute server.
349    let compute_client_builder = mz_compute::server::serve(
350        &metrics_registry,
351        persist_clients,
352        txns_ctx,
353        tracing_handle,
354        ComputeInstanceContext {
355            scratch_directory: args.scratch_directory,
356            worker_core_affinity: args.worker_core_affinity,
357            connection_context,
358        },
359    )?;
360    info!(
361        "listening for compute controller connections on {}",
362        args.compute_controller_listen_addr
363    );
364    mz_ore::task::spawn(
365        || "compute_server",
366        GrpcServer::serve(
367            &grpc_server_metrics,
368            args.compute_controller_listen_addr,
369            BUILD_INFO.semver_version(),
370            grpc_host,
371            compute_client_builder,
372            |svc| ProtoComputeServer::new(svc).max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
373        ),
374    );
375
376    // TODO: unify storage and compute servers to use one timely cluster.
377
378    // Block forever.
379    future::pending().await
380}