Skip to main content

mz_storage/decode/
avro.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_interchange::avro::Decoder;
11use mz_ore::error::ErrorExt;
12use mz_repr::Row;
13use mz_storage_types::errors::DecodeErrorKind;
14
15#[derive(Debug)]
16pub struct AvroDecoderState {
17    decoder: Decoder,
18    events_success: i64,
19}
20
21impl AvroDecoderState {
22    pub fn new(
23        value_schema: &str,
24        reference_schemas: &[String],
25        ccsr_client: Option<mz_ccsr::Client>,
26        debug_name: String,
27        confluent_wire_format: bool,
28    ) -> Result<Self, anyhow::Error> {
29        Ok(AvroDecoderState {
30            decoder: Decoder::new(
31                value_schema,
32                reference_schemas,
33                ccsr_client,
34                debug_name,
35                confluent_wire_format,
36            )?,
37            events_success: 0,
38        })
39    }
40
41    pub async fn decode(
42        &mut self,
43        bytes: &mut &[u8],
44    ) -> Result<Result<Option<Row>, DecodeErrorKind>, anyhow::Error> {
45        let result = match self.decoder.decode(bytes).await? {
46            Ok(row) => {
47                self.events_success += 1;
48                Ok(Some(row))
49            }
50            Err(err) => Err(DecodeErrorKind::Text(
51                format!("avro deserialization error: {}", err.display_with_causes()).into(),
52            )),
53        };
54        Ok(result)
55    }
56}