mz_interchange/
text_binary.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use bytes::BytesMut;
11use itertools::Itertools;
12use mz_repr::{ColumnName, ColumnType, RelationDesc};
13
14use crate::encode::{Encode, column_names_and_types};
15use crate::envelopes;
16
17#[derive(Debug)]
18pub struct TextEncoder {
19    columns: Vec<(ColumnName, ColumnType)>,
20}
21
22impl TextEncoder {
23    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
24        let mut columns = column_names_and_types(desc);
25        if debezium {
26            columns = envelopes::dbz_envelope(columns);
27        };
28        Self { columns }
29    }
30}
31
32impl Encode for TextEncoder {
33    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
34        let mut buf = BytesMut::new();
35        for ((_, typ), val) in self.columns.iter().zip_eq(row.iter()) {
36            if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
37                pgrepr_value.encode_text(&mut buf);
38            }
39        }
40
41        buf.to_vec()
42    }
43}
44
45#[derive(Debug)]
46pub struct BinaryEncoder {
47    columns: Vec<(ColumnName, ColumnType)>,
48}
49
50impl BinaryEncoder {
51    pub fn new(desc: RelationDesc, debezium: bool) -> Self {
52        let mut columns = column_names_and_types(desc);
53        if debezium {
54            columns = envelopes::dbz_envelope(columns);
55        };
56        Self { columns }
57    }
58}
59
60impl Encode for BinaryEncoder {
61    fn encode_unchecked(&self, row: mz_repr::Row) -> Vec<u8> {
62        let mut buf = BytesMut::new();
63        for ((_, typ), val) in self.columns.iter().zip_eq(row.iter()) {
64            if let Some(pgrepr_value) = mz_pgrepr::Value::from_datum(val, &typ.scalar_type) {
65                pgrepr_value
66                    .encode_binary(&mz_pgrepr::Type::from(&typ.scalar_type), &mut buf)
67                    .expect("encoding failed");
68            }
69        }
70
71        buf.to_vec()
72    }
73}