avro_decode/
avro-decode.rs1use std::path::PathBuf;
11use std::process;
12
13use anyhow::Context;
14use mz_interchange::avro::Decoder;
15use mz_interchange::confluent;
16use mz_ore::cli;
17use mz_ore::cli::CliConfig;
18use mz_ore::error::ErrorExt;
19use tokio::fs;
20
21#[derive(clap::Parser)]
23struct Args {
24 data_file: PathBuf,
26 schema_file: PathBuf,
28 #[clap(long)]
30 confluent_wire_format: bool,
31}
32
33#[tokio::main]
34async fn main() {
35 let args: Args = cli::parse_args(CliConfig::default());
36 if let Err(e) = run(args).await {
37 println!("{}", e.display_with_causes());
38 process::exit(1);
39 }
40}
41
42async fn run(args: Args) -> Result<(), anyhow::Error> {
43 let mut data = &*fs::read(&args.data_file)
44 .await
45 .context("reading data file")?;
46 if args.confluent_wire_format {
47 let (schema_id, adjusted_data) = confluent::extract_avro_header(data)?;
48 data = adjusted_data;
49 println!("schema id: {schema_id}");
50 }
51 let schema = fs::read_to_string(&args.schema_file)
52 .await
53 .context("reading schema file")?;
54 let ccsr_client: Option<mz_ccsr::Client> = None;
55 let debug_name = String::from("avro-decode");
56 let confluent_wire_format = false;
57 let mut decoder = Decoder::new(&schema, ccsr_client, debug_name, confluent_wire_format)
58 .context("creating decoder")?;
59 let row = decoder.decode(&mut data).await.context("decoding data")?;
60 println!("row: {row:?}");
61 Ok(())
62}