use std::path::PathBuf;
use std::process;
use std::sync::LazyLock;
use chrono::{DateTime, Utc};
use clap::Parser;
use kube::config::KubeConfigOptions;
use kube::{Client, Config};
use mz_build_info::{build_info, BuildInfo};
use mz_ore::cli::{self, CliConfig};
use mz_ore::error::ErrorExt;
use mz_ore::task;
use tracing::{error, info};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use crate::docker_dumper::DockerDumper;
use crate::k8s_dumper::K8sDumper;
use crate::kubectl_port_forwarder::create_kubectl_port_forwarder;
use crate::utils::{
create_tracing_log_file, format_base_path, validate_pg_connection_string, zip_debug_folder,
};
mod docker_dumper;
mod k8s_dumper;
mod kubectl_port_forwarder;
mod system_catalog_dumper;
mod utils;
const BUILD_INFO: BuildInfo = build_info!();
static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
static ENV_FILTER: &str = "mz_debug=info";
#[derive(Parser, Debug, Clone)]
pub struct SelfManagedDebugMode {
#[clap(long, default_value = "true", action = clap::ArgAction::Set)]
dump_k8s: bool,
#[clap(
long = "k8s-namespace",
required_unless_present = "dump_k8s",
required_if_eq("dump_k8s", "true"),
action = clap::ArgAction::Append
)]
k8s_namespaces: Vec<String>,
#[clap(long, env = "KUBERNETES_CONTEXT")]
k8s_context: Option<String>,
#[clap(long, default_value = "false", action = clap::ArgAction::Set)]
k8s_dump_secret_values: bool,
#[clap(long, default_value = "true", action = clap::ArgAction::Set)]
auto_port_forward: bool,
#[clap(long, default_value = "127.0.0.1")]
port_forward_local_address: String,
#[clap(long, default_value = "6875")]
port_forward_local_port: i32,
#[clap(
long,
env = "MZ_CONNECTION_URL",
value_parser = validate_pg_connection_string,
)]
mz_connection_url: Option<String>,
}
#[derive(Parser, Debug, Clone)]
pub struct EmulatorDebugMode {
#[clap(long, default_value = "true", action = clap::ArgAction::Set)]
dump_docker: bool,
#[clap(
long,
required_unless_present = "dump_docker",
required_if_eq("dump_docker", "true")
)]
docker_container_id: Option<String>,
#[clap(
long,
env = "MZ_CONNECTION_URL",
default_value = "postgres://127.0.0.1:6875/materialize?sslmode=prefer",
value_parser = validate_pg_connection_string,
)]
mz_connection_url: String,
}
#[derive(Parser, Debug, Clone)]
pub enum DebugMode {
SelfManaged(SelfManagedDebugMode),
Emulator(EmulatorDebugMode),
}
#[derive(Parser, Debug, Clone)]
#[clap(name = "mz-debug", next_line_help = true, version = VERSION.as_str())]
pub struct Args {
#[clap(subcommand)]
debug_mode: DebugMode,
#[clap(long, default_value = "true", action = clap::ArgAction::Set, global = true)]
dump_system_catalog: bool,
}
pub trait ContainerDumper {
fn dump_container_resources(&self) -> impl std::future::Future<Output = ()>;
}
pub enum ContainerServiceDumper<'n> {
K8s(K8sDumper<'n>),
Docker(DockerDumper),
}
impl<'n> ContainerServiceDumper<'n> {
fn new_k8s_dumper(
context: &'n Context,
client: Client,
k8s_namespaces: Vec<String>,
k8s_context: Option<String>,
k8s_dump_secret_values: bool,
) -> Self {
Self::K8s(K8sDumper::new(
context,
client,
k8s_namespaces,
k8s_context,
k8s_dump_secret_values,
))
}
fn new_docker_dumper(context: &'n Context, docker_container_id: String) -> Self {
Self::Docker(DockerDumper::new(context, docker_container_id))
}
}
impl<'n> ContainerDumper for ContainerServiceDumper<'n> {
async fn dump_container_resources(&self) {
match self {
ContainerServiceDumper::K8s(dumper) => dumper.dump_container_resources().await,
ContainerServiceDumper::Docker(dumper) => dumper.dump_container_resources().await,
}
}
}
#[derive(Clone)]
pub struct Context {
start_time: DateTime<Utc>,
}
#[tokio::main]
async fn main() {
let args: Args = cli::parse_args(CliConfig {
env_prefix: None,
enable_version_flag: true,
});
let start_time = Utc::now();
let stdout_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.without_time();
if let Ok(file) = create_tracing_log_file(start_time) {
let file_layer = tracing_subscriber::fmt::layer()
.with_writer(file)
.with_ansi(false);
let _ = tracing_subscriber::registry()
.with(EnvFilter::new(ENV_FILTER))
.with(stdout_layer)
.with(file_layer)
.try_init();
} else {
let _ = tracing_subscriber::registry()
.with(EnvFilter::new(ENV_FILTER))
.with(stdout_layer)
.try_init();
}
let context = Context { start_time };
if let Err(err) = run(context, args).await {
error!(
"mz-debug: fatal: {}\nbacktrace: {}",
err.display_with_causes(),
err.backtrace()
);
process::exit(1);
}
}
async fn run(context: Context, args: Args) -> Result<(), anyhow::Error> {
let container_system_dumper = match &args.debug_mode {
DebugMode::SelfManaged(args) => {
if args.dump_k8s {
let client = match create_k8s_client(args.k8s_context.clone()).await {
Ok(client) => client,
Err(e) => {
error!("Failed to create k8s client: {}", e);
return Err(e);
}
};
if args.auto_port_forward {
let port_forwarder = create_kubectl_port_forwarder(&client, args).await?;
task::spawn(|| "port-forwarding", async move {
port_forwarder.port_forward().await;
});
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
let dumper = ContainerServiceDumper::new_k8s_dumper(
&context,
client,
args.k8s_namespaces.clone(),
args.k8s_context.clone(),
args.k8s_dump_secret_values,
);
Some(dumper)
} else {
None
}
}
DebugMode::Emulator(args) => {
if args.dump_docker {
let docker_container_id = args
.docker_container_id
.clone()
.expect("docker_container_id is required");
let dumper =
ContainerServiceDumper::new_docker_dumper(&context, docker_container_id);
Some(dumper)
} else {
None
}
}
};
if let Some(dumper) = container_system_dumper {
dumper.dump_container_resources().await;
}
let connection_url = match &args.debug_mode {
DebugMode::SelfManaged(args) => kubectl_port_forwarder::create_mz_connection_url(
args.port_forward_local_address.clone(),
args.port_forward_local_port,
args.mz_connection_url.clone(),
),
DebugMode::Emulator(args) => args.mz_connection_url.clone(),
};
if args.dump_system_catalog {
let catalog_dumper = match system_catalog_dumper::SystemCatalogDumper::new(
&context,
&connection_url,
)
.await
{
Ok(dumper) => Some(dumper),
Err(e) => {
error!("Failed to dump system catalog: {}", e);
None
}
};
if let Some(dumper) = catalog_dumper {
dumper.dump_all_relations().await;
}
}
info!("Zipping debug directory");
let base_path = format_base_path(context.start_time);
let zip_file_name = format!("{}.zip", &base_path.display());
if let Err(e) = zip_debug_folder(PathBuf::from(&zip_file_name), &base_path) {
error!("Failed to zip debug directory: {}", e);
} else {
info!("Created zip debug at {}", &zip_file_name);
}
Ok(())
}
async fn create_k8s_client(k8s_context: Option<String>) -> Result<Client, anyhow::Error> {
let kubeconfig_options = KubeConfigOptions {
context: k8s_context,
..Default::default()
};
let kubeconfig = Config::from_kubeconfig(&kubeconfig_options).await?;
let client = Client::try_from(kubeconfig)?;
Ok(client)
}