1use std::path::PathBuf;
12use std::process;
13use std::sync::LazyLock;
14
15use anyhow::Context as AnyhowContext;
16use chrono::Utc;
17use clap::Parser;
18use kube::config::KubeConfigOptions;
19use kube::{Client as KubernetesClient, Config};
20use mz_build_info::{BuildInfo, build_info};
21use mz_ore::cli::{self, CliConfig};
22use mz_ore::error::ErrorExt;
23use tracing::{error, info, warn};
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28use crate::docker_dumper::DockerDumper;
29use crate::internal_http_dumper::{dump_emulator_http_resources, dump_self_managed_http_resources};
30use crate::k8s_dumper::K8sDumper;
31use crate::kubectl_port_forwarder::{PortForwardConnection, create_pg_wire_port_forwarder};
32use crate::utils::{
33 create_tracing_log_file, format_base_path, get_k8s_auth_mode, validate_pg_connection_string,
34 zip_debug_folder,
35};
36
37mod docker_dumper;
38mod internal_http_dumper;
39mod k8s_dumper;
40mod kubectl_port_forwarder;
41mod system_catalog_dumper;
42mod utils;
43
44const BUILD_INFO: BuildInfo = build_info!();
45static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
46static ENV_FILTER: &str = "mz_debug=info";
47pub static DEFAULT_MZ_ENVIRONMENTD_PORT: i32 = 6875;
48
49#[derive(Parser, Debug, Clone)]
50pub struct SelfManagedDebugModeArgs {
51 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
54 dump_k8s: bool,
55
56 #[clap(long)]
60 k8s_namespace: String,
61 #[clap(long)]
63 mz_instance_name: String,
64 #[clap(
66 long = "additional-k8s-namespace",
67 action = clap::ArgAction::Append
68 )]
69 additional_k8s_namespaces: Option<Vec<String>>,
70 #[clap(long, env = "KUBERNETES_CONTEXT")]
72 k8s_context: Option<String>,
73}
74
75#[derive(Parser, Debug, Clone)]
76pub struct EmulatorDebugModeArgs {
77 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
79 dump_docker: bool,
80 #[clap(long)]
82 docker_container_id: String,
83}
84
85#[derive(Parser, Debug, Clone)]
86pub enum DebugModeArgs {
87 SelfManaged(SelfManagedDebugModeArgs),
89 Emulator(EmulatorDebugModeArgs),
91}
92
93#[derive(Parser, Debug, Clone)]
94#[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())]
95pub struct Args {
96 #[clap(subcommand)]
97 debug_mode_args: DebugModeArgs,
98 #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
100 dump_system_catalog: bool,
101 #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
103 dump_heap_profiles: bool,
104 #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
106 dump_prometheus_metrics: bool,
107 #[clap(long, env = "MZ_USERNAME", global = true)]
109 mz_username: Option<String>,
110 #[clap(long, env = "MZ_PASSWORD", global = true)]
112 mz_password: Option<String>,
113 #[clap(
119 long,
120 env = "MZ_CONNECTION_URL",
121 value_parser = validate_pg_connection_string,
122 global = true
123 )]
124 mz_connection_url: Option<String>,
125}
126
127pub trait ContainerDumper {
128 fn dump_container_resources(&self) -> impl std::future::Future<Output = ()>;
129}
130
131#[derive(Debug, Clone)]
132pub struct PasswordAuthCredentials {
133 pub username: String,
134 pub password: String,
135}
136
137#[derive(Debug, Clone)]
138pub enum AuthMode {
139 None,
140 Password(PasswordAuthCredentials),
141}
142
143struct PortForwardConnectionInfo {
144 connection: PortForwardConnection,
145 auth_mode: AuthMode,
146}
147
148enum SelfManagedMzConnectionInfo {
149 PortForward(PortForwardConnectionInfo),
150 ConnectionUrlOverride(String),
151}
152
153struct SelfManagedContext {
154 dump_k8s: bool,
155 k8s_client: KubernetesClient,
156 k8s_context: Option<String>,
157 k8s_namespace: String,
158 mz_instance_name: String,
159 k8s_additional_namespaces: Option<Vec<String>>,
160 mz_connection_info: SelfManagedMzConnectionInfo,
161 http_connection_auth_mode: AuthMode,
162}
163
164#[derive(Debug, Clone)]
165struct ContainerIpInfo {
166 local_address: String,
167 local_port: i32,
168 auth_mode: AuthMode,
169}
170
171#[derive(Debug, Clone)]
172enum EmulatorMzConnectionInfo {
173 ContainerIp(ContainerIpInfo),
174 ConnectionUrlOverride(String),
175}
176
177#[derive(Debug, Clone)]
178struct EmulatorContext {
179 dump_docker: bool,
180 docker_container_id: String,
181 container_ip: String,
182 mz_connection_info: EmulatorMzConnectionInfo,
183 http_connection_auth_mode: AuthMode,
184}
185
186enum DebugModeContext {
187 SelfManaged(SelfManagedContext),
188 Emulator(EmulatorContext),
189}
190
191pub struct Context {
192 base_path: PathBuf,
193 debug_mode_context: DebugModeContext,
194 dump_system_catalog: bool,
195 dump_heap_profiles: bool,
196 dump_prometheus_metrics: bool,
197}
198
199#[tokio::main]
200async fn main() {
201 let args: Args = cli::parse_args(CliConfig {
202 env_prefix: None,
205 enable_version_flag: true,
206 });
207
208 let start_time = Utc::now();
209 let base_path = format_base_path(start_time);
210
211 let stdout_layer = tracing_subscriber::fmt::layer()
214 .with_target(false)
215 .without_time();
216
217 if let Ok(file) = create_tracing_log_file(base_path.clone()) {
218 let file_layer = tracing_subscriber::fmt::layer()
219 .with_writer(file)
220 .with_ansi(false);
221
222 let _ = tracing_subscriber::registry()
223 .with(EnvFilter::new(ENV_FILTER))
224 .with(stdout_layer)
225 .with(file_layer)
226 .try_init();
227 } else {
228 let _ = tracing_subscriber::registry()
229 .with(EnvFilter::new(ENV_FILTER))
230 .with(stdout_layer)
231 .try_init();
232 }
233
234 let initialize_then_run = async move {
235 let context = initialize_context(args, base_path).await?;
237 run(context).await
238 };
239
240 if let Err(err) = initialize_then_run.await {
241 error!(
242 "mz-debug: fatal: {}\nbacktrace: {}",
243 err.display_with_causes(),
244 err.backtrace()
245 );
246 process::exit(1);
247 }
248}
249
250fn create_mz_connection_url(
251 local_address: String,
252 local_port: i32,
253 credentials: Option<PasswordAuthCredentials>,
254) -> String {
255 let password_auth_segment = if let Some(credentials) = credentials {
256 format!("{}:{}@", credentials.username, credentials.password)
257 } else {
258 "".to_string()
259 };
260 format!(
261 "postgres://{}{}:{}?sslmode=prefer",
262 password_auth_segment, local_address, local_port
263 )
264}
265
266async fn initialize_context(
267 global_args: Args,
268 base_path: PathBuf,
269) -> Result<Context, anyhow::Error> {
270 let debug_mode_context = match &global_args.debug_mode_args {
271 DebugModeArgs::SelfManaged(args) => {
272 let k8s_client = match create_k8s_client(args.k8s_context.clone()).await {
273 Ok(k8s_client) => k8s_client,
274 Err(e) => {
275 error!("Failed to create k8s client: {}", e);
276 return Err(e);
277 }
278 };
279
280 let auth_mode = match get_k8s_auth_mode(
281 global_args.mz_username,
282 global_args.mz_password,
283 &k8s_client,
284 &args.k8s_namespace,
285 &args.mz_instance_name,
286 )
287 .await
288 {
289 Ok(auth_mode) => auth_mode,
290 Err(e) => {
291 warn!("Failed to get auth mode from k8s: {:#}", e);
292 AuthMode::None
294 }
295 };
296
297 let mz_connection_info = if let Some(mz_connection_url) = global_args.mz_connection_url
298 {
299 SelfManagedMzConnectionInfo::ConnectionUrlOverride(mz_connection_url)
301 } else {
302 let create_port_forward_connection = async || {
303 let port_forwarder = create_pg_wire_port_forwarder(
304 &k8s_client,
305 &args.k8s_context,
306 &args.k8s_namespace,
307 &args.mz_instance_name,
308 )
309 .await?;
310 port_forwarder.spawn_port_forward().await
311 };
312
313 let port_forward_connection = match create_port_forward_connection().await {
314 Ok(port_forward_connection) => port_forward_connection,
315 Err(e) => {
316 warn!(
317 "Failed to create port forward connection. Set --mz-connection-url to to a Materialize instance",
318 );
319 return Err(e);
320 }
321 };
322
323 SelfManagedMzConnectionInfo::PortForward(PortForwardConnectionInfo {
324 connection: port_forward_connection,
325 auth_mode: auth_mode.clone(),
326 })
327 };
328
329 DebugModeContext::SelfManaged(SelfManagedContext {
330 dump_k8s: args.dump_k8s,
331 k8s_client,
332 k8s_context: args.k8s_context.clone(),
333 k8s_namespace: args.k8s_namespace.clone(),
334 mz_instance_name: args.mz_instance_name.clone(),
335 k8s_additional_namespaces: args.additional_k8s_namespaces.clone(),
336 mz_connection_info,
337 http_connection_auth_mode: auth_mode,
338 })
339 }
340 DebugModeArgs::Emulator(args) => {
341 let container_ip = docker_dumper::get_container_ip(&args.docker_container_id)
342 .await
343 .with_context(|| {
344 format!(
345 "Failed to get IP for container {}",
346 args.docker_container_id
347 )
348 })?;
349
350 let auth_mode = if let (Some(mz_username), Some(mz_password)) =
354 (&global_args.mz_username, &global_args.mz_password)
355 {
356 AuthMode::Password(PasswordAuthCredentials {
357 username: mz_username.clone(),
358 password: mz_password.clone(),
359 })
360 } else {
361 AuthMode::None
362 };
363
364 let mz_connection_info = if let Some(mz_connection_url) = global_args.mz_connection_url
365 {
366 EmulatorMzConnectionInfo::ConnectionUrlOverride(mz_connection_url)
367 } else {
368 EmulatorMzConnectionInfo::ContainerIp(ContainerIpInfo {
369 local_address: container_ip.clone(),
370 local_port: DEFAULT_MZ_ENVIRONMENTD_PORT,
371 auth_mode: auth_mode.clone(),
372 })
373 };
374
375 DebugModeContext::Emulator(EmulatorContext {
376 dump_docker: args.dump_docker,
377 docker_container_id: args.docker_container_id.clone(),
378 container_ip,
379 mz_connection_info,
380 http_connection_auth_mode: auth_mode,
381 })
382 }
383 };
384
385 Ok(Context {
386 base_path,
387 debug_mode_context,
388 dump_system_catalog: global_args.dump_system_catalog,
389 dump_heap_profiles: global_args.dump_heap_profiles,
390 dump_prometheus_metrics: global_args.dump_prometheus_metrics,
391 })
392}
393
394async fn run(context: Context) -> Result<(), anyhow::Error> {
395 match &context.debug_mode_context {
398 DebugModeContext::SelfManaged(SelfManagedContext {
399 k8s_client,
400 dump_k8s,
401 k8s_context,
402 k8s_namespace,
403 k8s_additional_namespaces,
404 ..
405 }) => {
406 if *dump_k8s {
407 let dumper = K8sDumper::new(
408 &context,
409 k8s_client.clone(),
410 k8s_namespace.clone(),
411 k8s_additional_namespaces.clone(),
412 k8s_context.clone(),
413 );
414 dumper.dump_container_resources().await;
415 }
416 }
417 DebugModeContext::Emulator(EmulatorContext {
418 dump_docker,
419 docker_container_id,
420 ..
421 }) => {
422 if *dump_docker {
423 let dumper = DockerDumper::new(&context, docker_container_id.clone());
424 dumper.dump_container_resources().await;
425 }
426 }
427 };
428
429 match &context.debug_mode_context {
430 DebugModeContext::SelfManaged(self_managed_context) => {
431 if let Err(e) = dump_self_managed_http_resources(&context, self_managed_context).await {
432 warn!("Failed to dump self-managed http resources: {:#}", e);
433 }
434 }
435 DebugModeContext::Emulator(emulator_context) => {
436 if let Err(e) = dump_emulator_http_resources(&context, emulator_context).await {
437 warn!("Failed to dump emulator http resources: {:#}", e);
438 }
439 }
440 };
441
442 if context.dump_system_catalog {
443 let connection_url = match &context.debug_mode_context {
444 DebugModeContext::SelfManaged(self_managed_context) => {
445 match &self_managed_context.mz_connection_info {
446 SelfManagedMzConnectionInfo::PortForward(port_forward) => {
447 let credentials = match &port_forward.auth_mode {
448 AuthMode::Password(credentials) => Some(credentials.clone()),
449 AuthMode::None => None,
450 };
451 create_mz_connection_url(
452 port_forward.connection.local_address.clone(),
453 port_forward.connection.local_port,
454 credentials,
455 )
456 }
457 SelfManagedMzConnectionInfo::ConnectionUrlOverride(connection_url) => {
458 connection_url.clone()
459 }
460 }
461 }
462 DebugModeContext::Emulator(emulator_context) => {
463 match &emulator_context.mz_connection_info {
464 EmulatorMzConnectionInfo::ContainerIp(container_ip) => {
465 let credentials = match &container_ip.auth_mode {
466 AuthMode::Password(credentials) => Some(credentials.clone()),
467 AuthMode::None => None,
468 };
469 create_mz_connection_url(
470 container_ip.local_address.clone(),
471 container_ip.local_port,
472 credentials,
473 )
474 }
475 EmulatorMzConnectionInfo::ConnectionUrlOverride(connection_url) => {
476 connection_url.clone()
477 }
478 }
479 }
480 };
481 let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(
482 &connection_url,
483 context.base_path.clone(),
484 )
485 .await
486 {
487 Ok(dumper) => Some(dumper),
488 Err(e) => {
489 warn!("Failed to dump system catalog: {:#}", e);
490 None
491 }
492 };
493
494 if let Some(dumper) = catalog_dumper {
495 dumper.dump_all_relations().await;
496 }
497 }
498
499 info!("Zipping debug directory");
500
501 let zip_file_name = format!("{}.zip", &context.base_path.display());
502
503 if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &context.base_path) {
504 warn!("Failed to zip debug directory: {:#}", e);
505 } else {
506 info!("Created zip debug at {:#}", &zip_file_name);
507 }
508
509 Ok(())
510}
511
512async fn create_k8s_client(k8s_context: Option<String>) -> Result<KubernetesClient, anyhow::Error> {
514 let kubeconfig_options = KubeConfigOptions {
515 context: k8s_context,
516 ..Default::default()
517 };
518
519 let kubeconfig = Config::from_kubeconfig(&kubeconfig_options).await?;
520
521 let client = KubernetesClient::try_from(kubeconfig)?;
522
523 Ok(client)
524}