use mz_interchange::protobuf::{DecodedDescriptors, Decoder};
use mz_ore::error::ErrorExt;
use mz_repr::Row;
use mz_storage_types::errors::DecodeErrorKind;
use mz_storage_types::sources::encoding::ProtobufEncoding;
#[derive(Debug)]
pub struct ProtobufDecoderState {
decoder: Decoder,
events_success: i64,
events_error: i64,
}
impl ProtobufDecoderState {
pub fn new(
ProtobufEncoding {
descriptors,
message_name,
confluent_wire_format,
}: ProtobufEncoding,
) -> Result<Self, anyhow::Error> {
let descriptors = DecodedDescriptors::from_bytes(&descriptors, message_name)
.expect("descriptors provided to protobuf source are pre-validated");
Ok(ProtobufDecoderState {
decoder: Decoder::new(descriptors, confluent_wire_format)?,
events_success: 0,
events_error: 0,
})
}
pub fn get_value(&mut self, bytes: &[u8]) -> Option<Result<Row, DecodeErrorKind>> {
match self.decoder.decode(bytes) {
Ok(row) => {
if let Some(row) = row {
self.events_success += 1;
Some(Ok(row))
} else {
self.events_error += 1;
Some(Err(DecodeErrorKind::Text(
"protobuf deserialization returned None".into(),
)))
}
}
Err(err) => {
self.events_error += 1;
Some(Err(DecodeErrorKind::Text(
format!(
"protobuf deserialization error: {}",
err.display_with_causes()
)
.into(),
)))
}
}
}
}