avro_decode/
avro-decode.rsuse std::path::PathBuf;
use std::process;
use anyhow::Context;
use mz_interchange::avro::Decoder;
use mz_interchange::confluent;
use mz_ore::cli;
use mz_ore::cli::CliConfig;
use mz_ore::error::ErrorExt;
use tokio::fs;
#[derive(clap::Parser)]
struct Args {
data_file: PathBuf,
schema_file: PathBuf,
#[clap(long)]
confluent_wire_format: bool,
}
#[tokio::main]
async fn main() {
let args: Args = cli::parse_args(CliConfig::default());
if let Err(e) = run(args).await {
println!("{}", e.display_with_causes());
process::exit(1);
}
}
async fn run(args: Args) -> Result<(), anyhow::Error> {
let mut data = &*fs::read(&args.data_file)
.await
.context("reading data file")?;
if args.confluent_wire_format {
let (schema_id, adjusted_data) = confluent::extract_avro_header(data)?;
data = adjusted_data;
println!("schema id: {schema_id}");
}
let schema = fs::read_to_string(&args.schema_file)
.await
.context("reading schema file")?;
let ccsr_client: Option<mz_ccsr::Client> = None;
let debug_name = String::from("avro-decode");
let confluent_wire_format = false;
let mut decoder = Decoder::new(&schema, ccsr_client, debug_name, confluent_wire_format)
.context("creating decoder")?;
let row = decoder.decode(&mut data).await.context("decoding data")?;
println!("row: {row:?}");
Ok(())
}