mz_interchange/
text_binary.rs1use bytes::BytesMut;
11use itertools::Itertools;
12use mz_repr::{ColumnName, ColumnType, RelationDesc};
13
14use crate::encode::{Encode, column_names_and_types};
15use crate::envelopes;
16
17#[derive(Debug)]
18pub struct TextEncoder {
19 columns: Vec<(ColumnName, ColumnType)>,
20}
21
22impl TextEncoder {
23 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
24 let mut columns = column_names_and_types(desc);
25 if debezium {
26 columns = envelopes::dbz_envelope(columns);
27 };
28 Self { columns }
29 }
30}
31
32impl Encode for TextEncoder {
33 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
34 let mut buf = BytesMut::new();
35 for ((_, typ), val) in self.columns.iter().zip_eq(row.iter()) {
36 if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
37 pgrepr_value.encode_text(&mut buf);
38 }
39 }
40
41 buf.to_vec()
42 }
43}
44
45#[derive(Debug)]
46pub struct BinaryEncoder {
47 columns: Vec<(ColumnName, ColumnType)>,
48}
49
50impl BinaryEncoder {
51 pub fn new(desc: RelationDesc, debezium: bool) -> Self {
52 let mut columns = column_names_and_types(desc);
53 if debezium {
54 columns = envelopes::dbz_envelope(columns);
55 };
56 Self { columns }
57 }
58}
59
60impl Encode for BinaryEncoder {
61 fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
62 let mut buf = BytesMut::new();
63 for ((_, typ), val) in self.columns.iter().zip_eq(row.iter()) {
64 if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
65 pgrepr_value
66 .encode_binary(&mz_pgrepr::Type::from(&typ.scalar_type), &mut buf)
67 .expect("encoding failed");
68 }
69 }
70
71 buf.to_vec()
72 }
73}