mz_storage/decode/
avro.rs1use mz_interchange::avro::Decoder;
11use mz_ore::error::ErrorExt;
12use mz_repr::Row;
13use mz_storage_types::errors::DecodeErrorKind;
14
15#[derive(Debug)]
16pub struct AvroDecoderState {
17 decoder: Decoder,
18 events_success: i64,
19}
20
21impl AvroDecoderState {
22 pub fn new(
23 value_schema: &str,
24 reference_schemas: &[String],
25 ccsr_client: Option<mz_ccsr::Client>,
26 debug_name: String,
27 confluent_wire_format: bool,
28 ) -> Result<Self, anyhow::Error> {
29 Ok(AvroDecoderState {
30 decoder: Decoder::new(
31 value_schema,
32 reference_schemas,
33 ccsr_client,
34 debug_name,
35 confluent_wire_format,
36 )?,
37 events_success: 0,
38 })
39 }
40
41 pub async fn decode(
42 &mut self,
43 bytes: &mut &[u8],
44 ) -> Result<Result<Option<Row>, DecodeErrorKind>, anyhow::Error> {
45 let result = match self.decoder.decode(bytes).await? {
46 Ok(row) => {
47 self.events_success += 1;
48 Ok(Some(row))
49 }
50 Err(err) => Err(DecodeErrorKind::Text(
51 format!("avro deserialization error: {}", err.display_with_causes()).into(),
52 )),
53 };
54 Ok(result)
55 }
56}