1use std::path::PathBuf;
12use std::process;
13use std::sync::LazyLock;
14
15use chrono::{DateTime, Utc};
16use clap::Parser;
17use kube::config::KubeConfigOptions;
18use kube::{Client, Config};
19use mz_build_info::{BuildInfo, build_info};
20use mz_ore::cli::{self, CliConfig};
21use mz_ore::error::ErrorExt;
22use mz_ore::task;
23use tracing::{error, info};
24use tracing_subscriber::EnvFilter;
25use tracing_subscriber::layer::SubscriberExt;
26use tracing_subscriber::util::SubscriberInitExt;
27
28use crate::docker_dumper::DockerDumper;
29use crate::k8s_dumper::K8sDumper;
30use crate::kubectl_port_forwarder::create_kubectl_port_forwarder;
31use crate::utils::{
32 create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder,
33};
34
35mod docker_dumper;
36mod k8s_dumper;
37mod kubectl_port_forwarder;
38mod system_catalog_dumper;
39mod utils;
40
41const BUILD_INFO: BuildInfo = build_info!();
42static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
43static ENV_FILTER: &str = "mz_debug=info";
44
45#[derive(Parser, Debug, Clone)]
46pub struct SelfManagedDebugMode {
47 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
50 dump_k8s: bool,
51 #[clap(
53 long = "k8s-namespace",
54 required_unless_present = "dump_k8s",
56 required_if_eq("dump_k8s", "true"),
57 action = clap::ArgAction::Append
58 )]
59 k8s_namespaces: Vec<String>,
60 #[clap(long, env = "KUBERNETES_CONTEXT")]
62 k8s_context: Option<String>,
63 #[clap(long, default_value = "false", action = clap::ArgAction::Set)]
65 k8s_dump_secret_values: bool,
66 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
69 auto_port_forward: bool,
70 #[clap(long, default_value = "127.0.0.1")]
72 port_forward_local_address: String,
73 #[clap(long, default_value = "6875")]
75 port_forward_local_port: i32,
76 #[clap(
81 long,
82 env = "MZ_CONNECTION_URL",
83 value_parser = validate_pg_connection_string,
84 )]
85 mz_connection_url: Option<String>,
86}
87
88#[derive(Parser, Debug, Clone)]
89pub struct EmulatorDebugMode {
90 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
92 dump_docker: bool,
93 #[clap(
95 long,
96 required_unless_present = "dump_docker",
98 required_if_eq("dump_docker", "true")
99 )]
100 docker_container_id: Option<String>,
101 #[clap(
105 long,
106 env = "MZ_CONNECTION_URL",
107 default_value = "postgres://127.0.0.1:6875/materialize?sslmode=prefer",
109 value_parser = validate_pg_connection_string,
110 )]
111 mz_connection_url: String,
112}
113
114#[derive(Parser, Debug, Clone)]
115pub enum DebugMode {
116 SelfManaged(SelfManagedDebugMode),
118 Emulator(EmulatorDebugMode),
120}
121
122#[derive(Parser, Debug, Clone)]
123#[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())]
124pub struct Args {
125 #[clap(subcommand)]
126 debug_mode: DebugMode,
127 #[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
129 dump_system_catalog: bool,
130}
131
132pub trait ContainerDumper {
133 fn dump_container_resources(&self) -> impl std::future::Future<Output = ()>;
134}
135pub enum ContainerServiceDumper<'n> {
136 K8s(K8sDumper<'n>),
137 Docker(DockerDumper),
138}
139
140impl<'n> ContainerServiceDumper<'n> {
141 fn new_k8s_dumper(
142 context: &'n Context,
143 client: Client,
144 k8s_namespaces: Vec<String>,
145 k8s_context: Option<String>,
146 k8s_dump_secret_values: bool,
147 ) -> Self {
148 Self::K8s(K8sDumper::new(
149 context,
150 client,
151 k8s_namespaces,
152 k8s_context,
153 k8s_dump_secret_values,
154 ))
155 }
156
157 fn new_docker_dumper(context: &'n Context, docker_container_id: String) -> Self {
158 Self::Docker(DockerDumper::new(context, docker_container_id))
159 }
160}
161
162impl<'n> ContainerDumper for ContainerServiceDumper<'n> {
163 async fn dump_container_resources(&self) {
164 match self {
165 ContainerServiceDumper::K8s(dumper) => dumper.dump_container_resources().await,
166 ContainerServiceDumper::Docker(dumper) => dumper.dump_container_resources().await,
167 }
168 }
169}
170
171#[derive(Clone)]
172pub struct Context {
173 start_time: DateTime<Utc>,
174}
175
176#[tokio::main]
177async fn main() {
178 let args: Args = cli::parse_args(CliConfig {
179 env_prefix: None,
182 enable_version_flag: true,
183 });
184
185 let start_time = Utc::now();
186
187 let stdout_layer = tracing_subscriber::fmt::layer()
190 .with_target(false)
191 .without_time();
192
193 if let Ok(file) = create_tracing_log_file(start_time) {
194 let file_layer = tracing_subscriber::fmt::layer()
195 .with_writer(file)
196 .with_ansi(false);
197
198 let _ = tracing_subscriber::registry()
199 .with(EnvFilter::new(ENV_FILTER))
200 .with(stdout_layer)
201 .with(file_layer)
202 .try_init();
203 } else {
204 let _ = tracing_subscriber::registry()
205 .with(EnvFilter::new(ENV_FILTER))
206 .with(stdout_layer)
207 .try_init();
208 }
209
210 let context = Context { start_time };
211
212 if let Err(err) = run(context, args).await {
213 error!(
214 "mz-debug: fatal: {}\nbacktrace: {}",
215 err.display_with_causes(),
216 err.backtrace()
217 );
218 process::exit(1);
219 }
220}
221
222async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
223 let container_system_dumper = match &args.debug_mode {
226 DebugMode::SelfManaged(args) => {
227 if args.dump_k8s {
228 let client = match create_k8s_client(args.k8s_context.clone()).await {
229 Ok(client) => client,
230 Err(e) => {
231 error!("Failed to create k8s client: {}", e);
232 return Err(e);
233 }
234 };
235
236 if args.auto_port_forward {
237 let port_forwarder = create_kubectl_port_forwarder(&client, args).await?;
238 task::spawn(|| "port-forwarding", async move {
239 port_forwarder.port_forward().await;
240 });
241 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
244 }
245
246 let dumper = ContainerServiceDumper::new_k8s_dumper(
247 &context,
248 client,
249 args.k8s_namespaces.clone(),
250 args.k8s_context.clone(),
251 args.k8s_dump_secret_values,
252 );
253 Some(dumper)
254 } else {
255 None
256 }
257 }
258 DebugMode::Emulator(args) => {
259 if args.dump_docker {
260 let docker_container_id = args
261 .docker_container_id
262 .clone()
263 .expect("docker_container_id is required");
264 let dumper =
265 ContainerServiceDumper::new_docker_dumper(&context, docker_container_id);
266 Some(dumper)
267 } else {
268 None
269 }
270 }
271 };
272
273 if let Some(dumper) = container_system_dumper {
274 dumper.dump_container_resources().await;
275 }
276
277 let connection_url = match &args.debug_mode {
278 DebugMode::SelfManaged(args) => kubectl_port_forwarder::create_mz_connection_url(
279 args.port_forward_local_address.clone(),
280 args.port_forward_local_port,
281 args.mz_connection_url.clone(),
282 ),
283 DebugMode::Emulator(args) => args.mz_connection_url.clone(),
284 };
285 if args.dump_system_catalog {
286 let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(
288 &context,
289 &connection_url,
290 )
291 .await
292 {
293 Ok(dumper) => Some(dumper),
294 Err(e) => {
295 error!("Failed to dump system catalog: {}", e);
296 None
297 }
298 };
299
300 if let Some(dumper) = catalog_dumper {
301 dumper.dump_all_relations().await;
302 }
303 }
304
305 info!("Zipping debug directory");
306
307 let base_path = format_base_path(context.start_time);
308
309 let zip_file_name = format!("{}.zip", &base_path.display());
310
311 if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &base_path) {
312 error!("Failed to zip debug directory: {}", e);
313 } else {
314 info!("Created zip debug at {}", &zip_file_name);
315 }
316
317 Ok(())
318}
319
320async fn create_k8s_client(k8s_context: Option<String>) -> Result<Client, anyhow::Error> {
322 let kubeconfig_options = KubeConfigOptions {
323 context: k8s_context,
324 ..Default::default()
325 };
326
327 let kubeconfig = Config::from_kubeconfig(&kubeconfig_options).await?;
328
329 let client = Client::try_from(kubeconfig)?;
330
331 Ok(client)
332}