mz_storage/decode/
protobuf.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_interchange::protobuf::{DecodedDescriptors, Decoder};
11use mz_ore::error::ErrorExt;
12use mz_repr::Row;
13use mz_storage_types::errors::DecodeErrorKind;
14use mz_storage_types::sources::encoding::ProtobufEncoding;
15
16#[derive(Debug)]
17pub struct ProtobufDecoderState {
18    decoder: Decoder,
19    events_success: i64,
20    events_error: i64,
21}
22
23impl ProtobufDecoderState {
24    pub fn new(
25        ProtobufEncoding {
26            descriptors,
27            message_name,
28            confluent_wire_format,
29        }: ProtobufEncoding,
30    ) -> Result<Self, anyhow::Error> {
31        let descriptors = DecodedDescriptors::from_bytes(&descriptors, message_name)
32            .expect("descriptors provided to protobuf source are pre-validated");
33        Ok(ProtobufDecoderState {
34            decoder: Decoder::new(descriptors, confluent_wire_format)?,
35            events_success: 0,
36            events_error: 0,
37        })
38    }
39    pub fn get_value(&mut self, bytes: &[u8]) -> Option<Result<Row, DecodeErrorKind>> {
40        match self.decoder.decode(bytes) {
41            Ok(row) => {
42                if let Some(row) = row {
43                    self.events_success += 1;
44                    Some(Ok(row))
45                } else {
46                    self.events_error += 1;
47                    Some(Err(DecodeErrorKind::Text(
48                        "protobuf deserialization returned None".into(),
49                    )))
50                }
51            }
52            Err(err) => {
53                self.events_error += 1;
54                Some(Err(DecodeErrorKind::Text(
55                    format!(
56                        "protobuf deserialization error: {}",
57                        err.display_with_causes()
58                    )
59                    .into(),
60                )))
61            }
62        }
63    }
64}