mz_interchange/
text_binary.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use bytes::BytesMut;
11use mz_repr::{ColumnName, ColumnType, RelationDesc};
12
13use crate::encode::{Encode, column_names_and_types};
14use crate::envelopes;
15
16#[derive(Debug)]
17pub struct TextEncoder {
18    columns: Vec<(ColumnName, ColumnType)>,
19}
20
21impl TextEncoder {
22    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
23        let mut columns = column_names_and_types(desc);
24        if debezium {
25            columns = envelopes::dbz_envelope(columns);
26        };
27        Self { columns }
28    }
29}
30
31impl Encode for TextEncoder {
32    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
33        let mut buf = BytesMut::new();
34        for ((_, typ), val) in self.columns.iter().zip(row.iter()) {
35            if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
36                pgrepr_value.encode_text(&mut buf);
37            }
38        }
39
40        buf.to_vec()
41    }
42}
43
44#[derive(Debug)]
45pub struct BinaryEncoder {
46    columns: Vec<(ColumnName, ColumnType)>,
47}
48
49impl BinaryEncoder {
50    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
51        let mut columns = column_names_and_types(desc);
52        if debezium {
53            columns = envelopes::dbz_envelope(columns);
54        };
55        Self { columns }
56    }
57}
58
59impl Encode for BinaryEncoder {
60    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
61        let mut buf = BytesMut::new();
62        for ((_, typ), val) in self.columns.iter().zip(row.iter()) {
63            if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
64                pgrepr_value
65                    .encode_binary(&mz_pgrepr::Type::from(&typ.scalar_type), &mut buf)
66                    .expect("encoding failed");
67            }
68        }
69
70        buf.to_vec()
71    }
72}