mz_storage/decode/
protobuf.rs
1use mz_interchange::protobuf::{DecodedDescriptors, Decoder};
11use mz_ore::error::ErrorExt;
12use mz_repr::Row;
13use mz_storage_types::errors::DecodeErrorKind;
14use mz_storage_types::sources::encoding::ProtobufEncoding;
15
16#[derive(Debug)]
17pub struct ProtobufDecoderState {
18 decoder: Decoder,
19 events_success: i64,
20 events_error: i64,
21}
22
23impl ProtobufDecoderState {
24 pub fn new(
25 ProtobufEncoding {
26 descriptors,
27 message_name,
28 confluent_wire_format,
29 }: ProtobufEncoding,
30 ) -> Result<Self, anyhow::Error> {
31 let descriptors = DecodedDescriptors::from_bytes(&descriptors, message_name)
32 .expect("descriptors provided to protobuf source are pre-validated");
33 Ok(ProtobufDecoderState {
34 decoder: Decoder::new(descriptors, confluent_wire_format)?,
35 events_success: 0,
36 events_error: 0,
37 })
38 }
39 pub fn get_value(&mut self, bytes: &[u8]) -> Option<Result<Row, DecodeErrorKind>> {
40 match self.decoder.decode(bytes) {
41 Ok(row) => {
42 if let Some(row) = row {
43 self.events_success += 1;
44 Some(Ok(row))
45 } else {
46 self.events_error += 1;
47 Some(Err(DecodeErrorKind::Text(
48 "protobuf deserialization returned None".into(),
49 )))
50 }
51 }
52 Err(err) => {
53 self.events_error += 1;
54 Some(Err(DecodeErrorKind::Text(
55 format!(
56 "protobuf deserialization error: {}",
57 err.display_with_causes()
58 )
59 .into(),
60 )))
61 }
62 }
63 }
64}