1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use dataflow_types::{sources::encoding::ProtobufEncoding, DecodeError};
use interchange::protobuf::{DecodedDescriptors, Decoder};
use repr::Row;
#[derive(Debug)]
pub struct ProtobufDecoderState {
decoder: Decoder,
events_success: i64,
events_error: i64,
}
impl ProtobufDecoderState {
pub fn new(
ProtobufEncoding {
descriptors,
message_name,
confluent_wire_format,
}: ProtobufEncoding,
) -> Result<Self, anyhow::Error> {
let descriptors = DecodedDescriptors::from_bytes(&descriptors, message_name)
.expect("descriptors provided to protobuf source are pre-validated");
Ok(ProtobufDecoderState {
decoder: Decoder::new(descriptors, confluent_wire_format)?,
events_success: 0,
events_error: 0,
})
}
pub fn get_value(&mut self, bytes: &[u8]) -> Option<Result<Row, DecodeError>> {
use futures::executor::block_on;
match block_on(self.decoder.decode(bytes)) {
Ok(row) => {
if let Some(row) = row {
self.events_success += 1;
Some(Ok(row))
} else {
self.events_error += 1;
Some(Err(DecodeError::Text(format!(
"protobuf deserialization returned None"
))))
}
}
Err(err) => {
self.events_error += 1;
Some(Err(DecodeError::Text(format!(
"protobuf deserialization error: {:#}",
err
))))
}
}
}
}