1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use mz_interchange::avro::Decoder;
11use mz_ore::error::ErrorExt;
12use mz_repr::Row;
13use mz_storage_types::errors::DecodeErrorKind;
1415#[derive(Debug)]
16pub struct AvroDecoderState {
17 decoder: Decoder,
18 events_success: i64,
19}
2021impl AvroDecoderState {
22pub fn new(
23 value_schema: &str,
24 ccsr_client: Option<mz_ccsr::Client>,
25 debug_name: String,
26 confluent_wire_format: bool,
27 ) -> Result<Self, anyhow::Error> {
28Ok(AvroDecoderState {
29 decoder: Decoder::new(value_schema, ccsr_client, debug_name, confluent_wire_format)?,
30 events_success: 0,
31 })
32 }
3334pub async fn decode(
35&mut self,
36 bytes: &mut &[u8],
37 ) -> Result<Result<Option<Row>, DecodeErrorKind>, anyhow::Error> {
38let result = match self.decoder.decode(bytes).await? {
39Ok(row) => {
40self.events_success += 1;
41Ok(Some(row))
42 }
43Err(err) => Err(DecodeErrorKind::Text(
44format!("avro deserialization error: {}", err.display_with_causes()).into(),
45 )),
46 };
47Ok(result)
48 }
49}