mz_debug/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Debug tool for self managed environments.
11use std::path::PathBuf;
12use std::process;
13use std::sync::LazyLock;
14
15use anyhow::Context as AnyhowContext;
16use chrono::Utc;
17use clap::Parser;
18use kube::config::KubeConfigOptions;
19use kube::{Client as KubernetesClient, Config};
20use mz_build_info::{BuildInfo, build_info};
21use mz_ore::cli::{self, CliConfig};
22use mz_ore::error::ErrorExt;
23use tracing::{error, info, warn};
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28use crate::docker_dumper::DockerDumper;
29use crate::internal_http_dumper::{dump_emulator_http_resources, dump_self_managed_http_resources};
30use crate::k8s_dumper::K8sDumper;
31use crate::kubectl_port_forwarder::{PortForwardConnection, create_pg_wire_port_forwarder};
32use crate::utils::{
33    create_tracing_log_file, format_base_path, get_k8s_auth_mode, validate_pg_connection_string,
34    zip_debug_folder,
35};
36
37mod docker_dumper;
38mod internal_http_dumper;
39mod k8s_dumper;
40mod kubectl_port_forwarder;
41mod system_catalog_dumper;
42mod utils;
43
44const BUILD_INFO: BuildInfo = build_info!();
45static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
46static ENV_FILTER: &str = "mz_debug=info";
47pub static DEFAULT_MZ_ENVIRONMENTD_PORT: i32 = 6875;
48
49#[derive(Parser, Debug, Clone)]
50pub struct SelfManagedDebugModeArgs {
51    // === Kubernetes options. ===
52    /// If true, the tool will dump debug information in Kubernetes cluster such as logs, pod describes, etc.
53    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
54    dump_k8s: bool,
55
56    /// The k8s namespace that the Materialize instance is running in. This is necessary to interact
57    /// with the k8s API for gathering logs, port forwarding information, and information about the user's
58    /// environment.
59    #[clap(long)]
60    k8s_namespace: String,
61    /// The name of the Materialize instance to target.
62    #[clap(long)]
63    mz_instance_name: String,
64    /// A list of namespaces to dump.
65    #[clap(
66        long = "additional-k8s-namespace",
67        action = clap::ArgAction::Append
68    )]
69    additional_k8s_namespaces: Option<Vec<String>>,
70    /// The kubernetes context to use.
71    #[clap(long, env = "KUBERNETES_CONTEXT")]
72    k8s_context: Option<String>,
73}
74
75#[derive(Parser, Debug, Clone)]
76pub struct EmulatorDebugModeArgs {
77    /// If true, the tool will dump debug information of the docker container.
78    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
79    dump_docker: bool,
80    /// The ID of the docker container to dump.
81    #[clap(long)]
82    docker_container_id: String,
83}
84
85#[derive(Parser, Debug, Clone)]
86pub enum DebugModeArgs {
87    /// Debug self-managed environments
88    SelfManaged(SelfManagedDebugModeArgs),
89    /// Debug emulator environments
90    Emulator(EmulatorDebugModeArgs),
91}
92
93#[derive(Parser, Debug, Clone)]
94#[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())]
95pub struct Args {
96    #[clap(subcommand)]
97    debug_mode_args: DebugModeArgs,
98    /// If true, the tool will dump the system catalog in Materialize.
99    #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
100    dump_system_catalog: bool,
101    /// If true, the tool will dump the heap profiles in Materialize.
102    #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
103    dump_heap_profiles: bool,
104    /// If true, the tool will dump the prometheus metrics in Materialize.
105    #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
106    dump_prometheus_metrics: bool,
107    /// The username to use to connect to Materialize,
108    #[clap(long, env = "MZ_USERNAME", global = true)]
109    mz_username: Option<String>,
110    /// The password to use to connect to Materialize if the authenticator kind is Password.
111    #[clap(long, env = "MZ_PASSWORD", global = true)]
112    mz_password: Option<String>,
113    /// The URL of the Materialize SQL connection used to dump the system catalog.
114    /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`.
115    /// This acts as an override. By default, we will connect to the auto-port-forwarded connection for self-managed
116    /// or `<docker_container_ip>:6875` for the emulator.
117    /// If defined, `mz_username` and `mz_password` flags are ignored.
118    #[clap(
119        long,
120        env = "MZ_CONNECTION_URL",
121        value_parser = validate_pg_connection_string,
122        global = true
123    )]
124    mz_connection_url: Option<String>,
125}
126
127pub trait ContainerDumper {
128    fn dump_container_resources(&self) -> impl std::future::Future<Output = ()>;
129}
130
131#[derive(Debug, Clone)]
132pub struct PasswordAuthCredentials {
133    pub username: String,
134    pub password: String,
135}
136
137#[derive(Debug, Clone)]
138pub enum AuthMode {
139    None,
140    Password(PasswordAuthCredentials),
141}
142
143struct PortForwardConnectionInfo {
144    connection: PortForwardConnection,
145    auth_mode: AuthMode,
146}
147
148enum SelfManagedMzConnectionInfo {
149    PortForward(PortForwardConnectionInfo),
150    ConnectionUrlOverride(String),
151}
152
153struct SelfManagedContext {
154    dump_k8s: bool,
155    k8s_client: KubernetesClient,
156    k8s_context: Option<String>,
157    k8s_namespace: String,
158    mz_instance_name: String,
159    k8s_additional_namespaces: Option<Vec<String>>,
160    mz_connection_info: SelfManagedMzConnectionInfo,
161    http_connection_auth_mode: AuthMode,
162}
163
164#[derive(Debug, Clone)]
165struct ContainerIpInfo {
166    local_address: String,
167    local_port: i32,
168    auth_mode: AuthMode,
169}
170
171#[derive(Debug, Clone)]
172enum EmulatorMzConnectionInfo {
173    ContainerIp(ContainerIpInfo),
174    ConnectionUrlOverride(String),
175}
176
177#[derive(Debug, Clone)]
178struct EmulatorContext {
179    dump_docker: bool,
180    docker_container_id: String,
181    container_ip: String,
182    mz_connection_info: EmulatorMzConnectionInfo,
183    http_connection_auth_mode: AuthMode,
184}
185
186enum DebugModeContext {
187    SelfManaged(SelfManagedContext),
188    Emulator(EmulatorContext),
189}
190
191pub struct Context {
192    base_path: PathBuf,
193    debug_mode_context: DebugModeContext,
194    dump_system_catalog: bool,
195    dump_heap_profiles: bool,
196    dump_prometheus_metrics: bool,
197}
198
199#[tokio::main]
200async fn main() {
201    let args: Args = cli::parse_args(CliConfig {
202        // mz_ore::cli::parse_args' env_prefix doesn't apply for subcommand flags. Thus
203        // we manually set each env_prefix to MZ_ for each flag.
204        env_prefix: None,
205        enable_version_flag: true,
206    });
207
208    let start_time = Utc::now();
209    let base_path = format_base_path(start_time);
210
211    // We use tracing_subscriber to display the output of tracing to stdout
212    // and log to a file included in the debug zip.
213    let stdout_layer = tracing_subscriber::fmt::layer()
214        .with_target(false)
215        .without_time();
216
217    if let Ok(file) = create_tracing_log_file(base_path.clone()) {
218        let file_layer = tracing_subscriber::fmt::layer()
219            .with_writer(file)
220            .with_ansi(false);
221
222        let _ = tracing_subscriber::registry()
223            .with(EnvFilter::new(ENV_FILTER))
224            .with(stdout_layer)
225            .with(file_layer)
226            .try_init();
227    } else {
228        let _ = tracing_subscriber::registry()
229            .with(EnvFilter::new(ENV_FILTER))
230            .with(stdout_layer)
231            .try_init();
232    }
233
234    let initialize_then_run = async move {
235        // Preprocess args into contexts
236        let context = initialize_context(args, base_path).await?;
237        run(context).await
238    };
239
240    if let Err(err) = initialize_then_run.await {
241        error!(
242            "mz-debug: fatal: {}\nbacktrace: {}",
243            err.display_with_causes(),
244            err.backtrace()
245        );
246        process::exit(1);
247    }
248}
249
250fn create_mz_connection_url(
251    local_address: String,
252    local_port: i32,
253    credentials: Option<PasswordAuthCredentials>,
254) -> String {
255    let password_auth_segment = if let Some(credentials) = credentials {
256        format!("{}:{}@", credentials.username, credentials.password)
257    } else {
258        "".to_string()
259    };
260    format!(
261        "postgres://{}{}:{}?sslmode=prefer",
262        password_auth_segment, local_address, local_port
263    )
264}
265
266async fn initialize_context(
267    global_args: Args,
268    base_path: PathBuf,
269) -> Result<Context, anyhow::Error> {
270    let debug_mode_context = match &global_args.debug_mode_args {
271        DebugModeArgs::SelfManaged(args) => {
272            let k8s_client = match create_k8s_client(args.k8s_context.clone()).await {
273                Ok(k8s_client) => k8s_client,
274                Err(e) => {
275                    error!("Failed to create k8s client: {}", e);
276                    return Err(e);
277                }
278            };
279
280            let auth_mode = match get_k8s_auth_mode(
281                global_args.mz_username,
282                global_args.mz_password,
283                &k8s_client,
284                &args.k8s_namespace,
285                &args.mz_instance_name,
286            )
287            .await
288            {
289                Ok(auth_mode) => auth_mode,
290                Err(e) => {
291                    warn!("Failed to get auth mode from k8s: {:#}", e);
292                    // By default, set auth mode to None.
293                    AuthMode::None
294                }
295            };
296
297            let mz_connection_info = if let Some(mz_connection_url) = global_args.mz_connection_url
298            {
299                // If the user provides a connection URL, don't bother port forwarding.
300                SelfManagedMzConnectionInfo::ConnectionUrlOverride(mz_connection_url)
301            } else {
302                let create_port_forward_connection = async || {
303                    let port_forwarder = create_pg_wire_port_forwarder(
304                        &k8s_client,
305                        &args.k8s_context,
306                        &args.k8s_namespace,
307                        &args.mz_instance_name,
308                    )
309                    .await?;
310                    port_forwarder.spawn_port_forward().await
311                };
312
313                let port_forward_connection = match create_port_forward_connection().await {
314                    Ok(port_forward_connection) => port_forward_connection,
315                    Err(e) => {
316                        warn!(
317                            "Failed to create port forward connection. Set --mz-connection-url to to a Materialize instance",
318                        );
319                        return Err(e);
320                    }
321                };
322
323                SelfManagedMzConnectionInfo::PortForward(PortForwardConnectionInfo {
324                    connection: port_forward_connection,
325                    auth_mode: auth_mode.clone(),
326                })
327            };
328
329            DebugModeContext::SelfManaged(SelfManagedContext {
330                dump_k8s: args.dump_k8s,
331                k8s_client,
332                k8s_context: args.k8s_context.clone(),
333                k8s_namespace: args.k8s_namespace.clone(),
334                mz_instance_name: args.mz_instance_name.clone(),
335                k8s_additional_namespaces: args.additional_k8s_namespaces.clone(),
336                mz_connection_info,
337                http_connection_auth_mode: auth_mode,
338            })
339        }
340        DebugModeArgs::Emulator(args) => {
341            let container_ip = docker_dumper::get_container_ip(&args.docker_container_id)
342                .await
343                .with_context(|| {
344                    format!(
345                        "Failed to get IP for container {}",
346                        args.docker_container_id
347                    )
348                })?;
349
350            // For the emulator, we assume if a user provides a username and password, they
351            // want to use password authentication.
352            // TODO (debug_tool3): Figure out the auth mode from arguments using docker inspect.
353            let auth_mode = if let (Some(mz_username), Some(mz_password)) =
354                (&global_args.mz_username, &global_args.mz_password)
355            {
356                AuthMode::Password(PasswordAuthCredentials {
357                    username: mz_username.clone(),
358                    password: mz_password.clone(),
359                })
360            } else {
361                AuthMode::None
362            };
363
364            let mz_connection_info = if let Some(mz_connection_url) = global_args.mz_connection_url
365            {
366                EmulatorMzConnectionInfo::ConnectionUrlOverride(mz_connection_url)
367            } else {
368                EmulatorMzConnectionInfo::ContainerIp(ContainerIpInfo {
369                    local_address: container_ip.clone(),
370                    local_port: DEFAULT_MZ_ENVIRONMENTD_PORT,
371                    auth_mode: auth_mode.clone(),
372                })
373            };
374
375            DebugModeContext::Emulator(EmulatorContext {
376                dump_docker: args.dump_docker,
377                docker_container_id: args.docker_container_id.clone(),
378                container_ip,
379                mz_connection_info,
380                http_connection_auth_mode: auth_mode,
381            })
382        }
383    };
384
385    Ok(Context {
386        base_path,
387        debug_mode_context,
388        dump_system_catalog: global_args.dump_system_catalog,
389        dump_heap_profiles: global_args.dump_heap_profiles,
390        dump_prometheus_metrics: global_args.dump_prometheus_metrics,
391    })
392}
393
394async fn run(context: Context) -> Result<(), anyhow::Error> {
395    // Depending on if the user is debugging either a k8s environments or docker environment,
396    // dump the respective system's resources
397    match &context.debug_mode_context {
398        DebugModeContext::SelfManaged(SelfManagedContext {
399            k8s_client,
400            dump_k8s,
401            k8s_context,
402            k8s_namespace,
403            k8s_additional_namespaces,
404            ..
405        }) => {
406            if *dump_k8s {
407                let dumper = K8sDumper::new(
408                    &context,
409                    k8s_client.clone(),
410                    k8s_namespace.clone(),
411                    k8s_additional_namespaces.clone(),
412                    k8s_context.clone(),
413                );
414                dumper.dump_container_resources().await;
415            }
416        }
417        DebugModeContext::Emulator(EmulatorContext {
418            dump_docker,
419            docker_container_id,
420            ..
421        }) => {
422            if *dump_docker {
423                let dumper = DockerDumper::new(&context, docker_container_id.clone());
424                dumper.dump_container_resources().await;
425            }
426        }
427    };
428
429    match &context.debug_mode_context {
430        DebugModeContext::SelfManaged(self_managed_context) => {
431            if let Err(e) = dump_self_managed_http_resources(&context, self_managed_context).await {
432                warn!("Failed to dump self-managed http resources: {:#}", e);
433            }
434        }
435        DebugModeContext::Emulator(emulator_context) => {
436            if let Err(e) = dump_emulator_http_resources(&context, emulator_context).await {
437                warn!("Failed to dump emulator http resources: {:#}", e);
438            }
439        }
440    };
441
442    if context.dump_system_catalog {
443        let connection_url = match &context.debug_mode_context {
444            DebugModeContext::SelfManaged(self_managed_context) => {
445                match &self_managed_context.mz_connection_info {
446                    SelfManagedMzConnectionInfo::PortForward(port_forward) => {
447                        let credentials = match &port_forward.auth_mode {
448                            AuthMode::Password(credentials) => Some(credentials.clone()),
449                            AuthMode::None => None,
450                        };
451                        create_mz_connection_url(
452                            port_forward.connection.local_address.clone(),
453                            port_forward.connection.local_port,
454                            credentials,
455                        )
456                    }
457                    SelfManagedMzConnectionInfo::ConnectionUrlOverride(connection_url) => {
458                        connection_url.clone()
459                    }
460                }
461            }
462            DebugModeContext::Emulator(emulator_context) => {
463                match &emulator_context.mz_connection_info {
464                    EmulatorMzConnectionInfo::ContainerIp(container_ip) => {
465                        let credentials = match &container_ip.auth_mode {
466                            AuthMode::Password(credentials) => Some(credentials.clone()),
467                            AuthMode::None => None,
468                        };
469                        create_mz_connection_url(
470                            container_ip.local_address.clone(),
471                            container_ip.local_port,
472                            credentials,
473                        )
474                    }
475                    EmulatorMzConnectionInfo::ConnectionUrlOverride(connection_url) => {
476                        connection_url.clone()
477                    }
478                }
479            }
480        };
481        let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(
482            &connection_url,
483            context.base_path.clone(),
484        )
485        .await
486        {
487            Ok(dumper) => Some(dumper),
488            Err(e) => {
489                warn!("Failed to dump system catalog: {:#}", e);
490                None
491            }
492        };
493
494        if let Some(dumper) = catalog_dumper {
495            dumper.dump_all_relations().await;
496        }
497    }
498
499    info!("Zipping debug directory");
500
501    let zip_file_name = format!("{}.zip", &context.base_path.display());
502
503    if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &context.base_path) {
504        warn!("Failed to zip debug directory: {:#}", e);
505    } else {
506        info!("Created zip debug at {:#}", &zip_file_name);
507    }
508
509    Ok(())
510}
511
512/// Creates a k8s client given a context. If no context is provided, the default context is used.
513async fn create_k8s_client(k8s_context: Option<String>) -> Result<KubernetesClient, anyhow::Error> {
514    let kubeconfig_options = KubeConfigOptions {
515        context: k8s_context,
516        ..Default::default()
517    };
518
519    let kubeconfig = Config::from_kubeconfig(&kubeconfig_options).await?;
520
521    let client = KubernetesClient::try_from(kubeconfig)?;
522
523    Ok(client)
524}