mz_interchange/
text_binary.rs
1use bytes::BytesMut;
11use mz_repr::{ColumnName, ColumnType, RelationDesc};
12
13use crate::encode::{Encode, column_names_and_types};
14use crate::envelopes;
15
16#[derive(Debug)]
17pub struct TextEncoder {
18 columns: Vec<(ColumnName, ColumnType)>,
19}
20
21impl TextEncoder {
22 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
23 let mut columns = column_names_and_types(desc);
24 if debezium {
25 columns = envelopes::dbz_envelope(columns);
26 };
27 Self { columns }
28 }
29}
30
31impl Encode for TextEncoder {
32 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
33 let mut buf = BytesMut::new();
34 for ((_, typ), val) in self.columns.iter().zip(row.iter()) {
35 if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
36 pgrepr_value.encode_text(&mut buf);
37 }
38 }
39
40 buf.to_vec()
41 }
42}
43
44#[derive(Debug)]
45pub struct BinaryEncoder {
46 columns: Vec<(ColumnName, ColumnType)>,
47}
48
49impl BinaryEncoder {
50 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
51 let mut columns = column_names_and_types(desc);
52 if debezium {
53 columns = envelopes::dbz_envelope(columns);
54 };
55 Self { columns }
56 }
57}
58
59impl Encode for BinaryEncoder {
60 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
61 let mut buf = BytesMut::new();
62 for ((_, typ), val) in self.columns.iter().zip(row.iter()) {
63 if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
64 pgrepr_value
65 .encode_binary(&mz_pgrepr::Type::from(&typ.scalar_type), &mut buf)
66 .expect("encoding failed");
67 }
68 }
69
70 buf.to_vec()
71 }
72}