mz_debug/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Debug tool for self managed environments.
11use std::path::PathBuf;
12use std::process;
13use std::sync::LazyLock;
14
15use chrono::{DateTime, Utc};
16use clap::Parser;
17use kube::config::KubeConfigOptions;
18use kube::{Client, Config};
19use mz_build_info::{BuildInfo, build_info};
20use mz_ore::cli::{self, CliConfig};
21use mz_ore::error::ErrorExt;
22use mz_ore::task;
23use tracing::{error, info};
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28use crate::docker_dumper::DockerDumper;
29use crate::k8s_dumper::K8sDumper;
30use crate::kubectl_port_forwarder::create_kubectl_port_forwarder;
31use crate::utils::{
32    create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder,
33};
34
35mod docker_dumper;
36mod k8s_dumper;
37mod kubectl_port_forwarder;
38mod system_catalog_dumper;
39mod utils;
40
41const BUILD_INFO: BuildInfo = build_info!();
42static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
43static ENV_FILTER: &str = "mz_debug=info";
44
45#[derive(Parser, Debug, Clone)]
46pub struct SelfManagedDebugMode {
47    // === Kubernetes options. ===
48    /// If true, the tool will dump debug information in Kubernetes cluster such as logs, pod describes, etc.
49    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
50    dump_k8s: bool,
51    /// A list of namespaces to dump.
52    #[clap(
53        long = "k8s-namespace",
54        // We require both `require`s because `required_if_eq` doesn't work for default values.
55        required_unless_present = "dump_k8s",
56        required_if_eq("dump_k8s", "true"),
57        action = clap::ArgAction::Append
58    )]
59    k8s_namespaces: Vec<String>,
60    /// The kubernetes context to use.
61    #[clap(long, env = "KUBERNETES_CONTEXT")]
62    k8s_context: Option<String>,
63    /// If true, the tool will dump the values of secrets in the Kubernetes cluster.
64    #[clap(long, default_value = "false", action = clap::ArgAction::Set)]
65    k8s_dump_secret_values: bool,
66    /// If true, the tool will automatically port-forward the external SQL port in the Kubernetes cluster.
67    /// If dump_k8s is false however, we will not automatically port-forward.
68    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
69    auto_port_forward: bool,
70    /// The address to listen on for the port-forward.
71    #[clap(long, default_value = "127.0.0.1")]
72    port_forward_local_address: String,
73    /// The port to listen on for the port-forward.
74    #[clap(long, default_value = "6875")]
75    port_forward_local_port: i32,
76    /// The URL of the Materialize SQL connection used to dump the system catalog.
77    /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`.
78    /// By default, we will create a connection URL based on `port_forward_local_address` and `port_forward_local_port`.
79    // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables
80    #[clap(
81        long,
82        env = "MZ_CONNECTION_URL",
83        value_parser = validate_pg_connection_string,
84    )]
85    mz_connection_url: Option<String>,
86}
87
88#[derive(Parser, Debug, Clone)]
89pub struct EmulatorDebugMode {
90    /// If true, the tool will dump debug information of the docker container.
91    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
92    dump_docker: bool,
93    /// The ID of the docker container to dump.
94    #[clap(
95        long,
96        // We require both `require`s because `required_if_eq` doesn't work for default values.
97        required_unless_present = "dump_docker",
98        required_if_eq("dump_docker", "true")
99    )]
100    docker_container_id: Option<String>,
101    /// The URL of the Materialize SQL connection used to dump the system catalog.
102    /// An example URL is `postgres://root@127.0.0.1:6875/materialize?sslmode=disable`.
103    // TODO(debug_tool3): Allow users to specify the pgconfig via separate variables
104    #[clap(
105        long,
106        env = "MZ_CONNECTION_URL",
107        // We assume that the emulator is running on the default port.
108        default_value = "postgres://127.0.0.1:6875/materialize?sslmode=prefer",
109        value_parser = validate_pg_connection_string,
110    )]
111    mz_connection_url: String,
112}
113
114#[derive(Parser, Debug, Clone)]
115pub enum DebugMode {
116    /// Debug self-managed environments
117    SelfManaged(SelfManagedDebugMode),
118    /// Debug emulator environments
119    Emulator(EmulatorDebugMode),
120}
121
122#[derive(Parser, Debug, Clone)]
123#[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())]
124pub struct Args {
125    #[clap(subcommand)]
126    debug_mode: DebugMode,
127    /// If true, the tool will dump the system catalog in Materialize.
128    #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
129    dump_system_catalog: bool,
130}
131
132pub trait ContainerDumper {
133    fn dump_container_resources(&self) -> impl std::future::Future<Output = ()>;
134}
135pub enum ContainerServiceDumper<'n> {
136    K8s(K8sDumper<'n>),
137    Docker(DockerDumper),
138}
139
140impl<'n> ContainerServiceDumper<'n> {
141    fn new_k8s_dumper(
142        context: &'n Context,
143        client: Client,
144        k8s_namespaces: Vec<String>,
145        k8s_context: Option<String>,
146        k8s_dump_secret_values: bool,
147    ) -> Self {
148        Self::K8s(K8sDumper::new(
149            context,
150            client,
151            k8s_namespaces,
152            k8s_context,
153            k8s_dump_secret_values,
154        ))
155    }
156
157    fn new_docker_dumper(context: &'n Context, docker_container_id: String) -> Self {
158        Self::Docker(DockerDumper::new(context, docker_container_id))
159    }
160}
161
162impl<'n> ContainerDumper for ContainerServiceDumper<'n> {
163    async fn dump_container_resources(&self) {
164        match self {
165            ContainerServiceDumper::K8s(dumper) => dumper.dump_container_resources().await,
166            ContainerServiceDumper::Docker(dumper) => dumper.dump_container_resources().await,
167        }
168    }
169}
170
171#[derive(Clone)]
172pub struct Context {
173    start_time: DateTime<Utc>,
174}
175
176#[tokio::main]
177async fn main() {
178    let args: Args = cli::parse_args(CliConfig {
179        // mz_ore::cli::parse_args' env_prefix doesn't apply for subcommand flags. Thus
180        // we manually set each env_prefix to MZ_ for each flag.
181        env_prefix: None,
182        enable_version_flag: true,
183    });
184
185    let start_time = Utc::now();
186
187    // We use tracing_subscriber to display the output of tracing to stdout
188    // and log to a file included in the debug zip.
189    let stdout_layer = tracing_subscriber::fmt::layer()
190        .with_target(false)
191        .without_time();
192
193    if let Ok(file) = create_tracing_log_file(start_time) {
194        let file_layer = tracing_subscriber::fmt::layer()
195            .with_writer(file)
196            .with_ansi(false);
197
198        let _ = tracing_subscriber::registry()
199            .with(EnvFilter::new(ENV_FILTER))
200            .with(stdout_layer)
201            .with(file_layer)
202            .try_init();
203    } else {
204        let _ = tracing_subscriber::registry()
205            .with(EnvFilter::new(ENV_FILTER))
206            .with(stdout_layer)
207            .try_init();
208    }
209
210    let context = Context { start_time };
211
212    if let Err(err) = run(context, args).await {
213        error!(
214            "mz-debug: fatal: {}\nbacktrace: {}",
215            err.display_with_causes(),
216            err.backtrace()
217        );
218        process::exit(1);
219    }
220}
221
222async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
223    // Depending on if the user is debugging either a k8s environments or docker environment,
224    // dump the respective system's resources.
225    let container_system_dumper = match &args.debug_mode {
226        DebugMode::SelfManaged(args) => {
227            if args.dump_k8s {
228                let client = match create_k8s_client(args.k8s_context.clone()).await {
229                    Ok(client) => client,
230                    Err(e) => {
231                        error!("Failed to create k8s client: {}", e);
232                        return Err(e);
233                    }
234                };
235
236                if args.auto_port_forward {
237                    let port_forwarder = create_kubectl_port_forwarder(&client, args).await?;
238                    task::spawn(|| "port-forwarding", async move {
239                        port_forwarder.port_forward().await;
240                    });
241                    // There may be a delay between when the port forwarding process starts and when it's ready
242                    // to use. We wait a few seconds to ensure that port forwarding is ready.
243                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
244                }
245
246                let dumper = ContainerServiceDumper::new_k8s_dumper(
247                    &context,
248                    client,
249                    args.k8s_namespaces.clone(),
250                    args.k8s_context.clone(),
251                    args.k8s_dump_secret_values,
252                );
253                Some(dumper)
254            } else {
255                None
256            }
257        }
258        DebugMode::Emulator(args) => {
259            if args.dump_docker {
260                let docker_container_id = args
261                    .docker_container_id
262                    .clone()
263                    .expect("docker_container_id is required");
264                let dumper =
265                    ContainerServiceDumper::new_docker_dumper(&context, docker_container_id);
266                Some(dumper)
267            } else {
268                None
269            }
270        }
271    };
272
273    if let Some(dumper) = container_system_dumper {
274        dumper.dump_container_resources().await;
275    }
276
277    let connection_url = match &args.debug_mode {
278        DebugMode::SelfManaged(args) => kubectl_port_forwarder::create_mz_connection_url(
279            args.port_forward_local_address.clone(),
280            args.port_forward_local_port,
281            args.mz_connection_url.clone(),
282        ),
283        DebugMode::Emulator(args) => args.mz_connection_url.clone(),
284    };
285    if args.dump_system_catalog {
286        // Dump the system catalog.
287        let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(
288            &context,
289            &connection_url,
290        )
291        .await
292        {
293            Ok(dumper) => Some(dumper),
294            Err(e) => {
295                error!("Failed to dump system catalog: {}", e);
296                None
297            }
298        };
299
300        if let Some(dumper) = catalog_dumper {
301            dumper.dump_all_relations().await;
302        }
303    }
304
305    info!("Zipping debug directory");
306
307    let base_path = format_base_path(context.start_time);
308
309    let zip_file_name = format!("{}.zip", &base_path.display());
310
311    if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &base_path) {
312        error!("Failed to zip debug directory: {}", e);
313    } else {
314        info!("Created zip debug at {}", &zip_file_name);
315    }
316
317    Ok(())
318}
319
320/// Creates a k8s client given a context. If no context is provided, the default context is used.
321async fn create_k8s_client(k8s_context: Option<String>) -> Result<Client, anyhow::Error> {
322    let kubeconfig_options = KubeConfigOptions {
323        context: k8s_context,
324        ..Default::default()
325    };
326
327    let kubeconfig = Config::from_kubeconfig(&kubeconfig_options).await?;
328
329    let client = Client::try_from(kubeconfig)?;
330
331    Ok(client)
332}