mz_interchange/
avro.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_avro::schema::{SchemaPiece, SchemaPieceOrNamed};
11
12mod decode;
13mod encode;
14mod schema;
15
16pub use crate::avro::decode::{Decoder, DiffPair};
17pub use crate::avro::encode::{
18    AvroEncoder, AvroSchemaGenerator, DocTarget, encode_datums_as_avro,
19    encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
20};
21pub use crate::avro::schema::{ConfluentAvroResolver, parse_schema, schema_to_relationdesc};
22
23fn is_null(schema: &SchemaPieceOrNamed) -> bool {
24    matches!(schema, SchemaPieceOrNamed::Piece(SchemaPiece::Null))
25}
26
27#[cfg(test)]
28mod tests {
29    use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
30    use mz_avro::types::{DecimalValue, Value};
31    use mz_repr::adt::date::Date;
32    use mz_repr::adt::numeric::{self, NumericMaxScale};
33    use mz_repr::adt::timestamp::CheckedTimestamp;
34    use mz_repr::{Datum, RelationDesc, ScalarType};
35    use ordered_float::OrderedFloat;
36
37    use super::*;
38
39    #[mz_ore::test]
40    fn record_without_fields() -> anyhow::Result<()> {
41        let schema = r#"{
42            "type": "record",
43            "name": "test",
44            "fields": []
45        }"#;
46
47        let desc = schema_to_relationdesc(parse_schema(schema)?)?;
48        assert_eq!(desc.arity(), 0, "empty record produced rows");
49
50        Ok(())
51    }
52
53    #[mz_ore::test]
54    fn basic_record() -> anyhow::Result<()> {
55        let schema = r#"{
56            "type": "record",
57            "name": "test",
58            "fields": [
59                { "name": "f1", "type": "int" },
60                { "name": "f2", "type": "string" }
61            ]
62        }"#;
63
64        let desc = schema_to_relationdesc(parse_schema(schema)?)?;
65        let expected_desc = RelationDesc::builder()
66            .with_column("f1", ScalarType::Int32.nullable(false))
67            .with_column("f2", ScalarType::String.nullable(false))
68            .finish();
69
70        assert_eq!(desc, expected_desc);
71        Ok(())
72    }
73
74    #[mz_ore::test]
75    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
76    /// Test that primitive Avro Schema types are allow Datums to be correctly
77    /// serialized into Avro Values.
78    ///
79    /// Complete list of primitive types in test, also found in this
80    /// documentation:
81    /// https://avro.apache.org/docs/current/spec.html#schemas
82    fn test_diff_pair_to_avro_primitive_types() -> anyhow::Result<()> {
83        use numeric::Numeric;
84        // Data to be used later in assertions.
85        let date = 365 * 50 + 20;
86        let date_time = NaiveDateTime::new(
87            NaiveDate::from_ymd_opt(2020, 1, 8).unwrap(),
88            NaiveTime::from_hms_opt(1, 1, 1).unwrap(),
89        );
90        let bytes: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10];
91        let string = String::from("test");
92
93        // Simple transformations from primitive Avro Schema types
94        // to Avro Values.
95        let valid_pairings = vec![
96            (ScalarType::Bool, Datum::True, Value::Boolean(true)),
97            (ScalarType::Bool, Datum::False, Value::Boolean(false)),
98            (ScalarType::Int32, Datum::Int32(1), Value::Int(1)),
99            (ScalarType::Int64, Datum::Int64(1), Value::Long(1)),
100            (
101                ScalarType::Float32,
102                Datum::Float32(OrderedFloat::from(1f32)),
103                Value::Float(1f32),
104            ),
105            (
106                ScalarType::Float64,
107                Datum::Float64(OrderedFloat::from(1f64)),
108                Value::Double(1f64),
109            ),
110            (
111                ScalarType::Date,
112                Datum::Date(Date::from_unix_epoch(date).unwrap()),
113                Value::Date(date),
114            ),
115            (
116                ScalarType::Timestamp { precision: None },
117                Datum::Timestamp(CheckedTimestamp::from_timestamplike(date_time).unwrap()),
118                Value::Timestamp(date_time),
119            ),
120            (
121                ScalarType::TimestampTz { precision: None },
122                Datum::TimestampTz(
123                    CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
124                        date_time, Utc,
125                    ))
126                    .unwrap(),
127                ),
128                Value::Timestamp(date_time),
129            ),
130            (
131                ScalarType::Numeric {
132                    max_scale: Some(NumericMaxScale::try_from(1_i64)?),
133                },
134                Datum::from(Numeric::from(1)),
135                Value::Decimal(DecimalValue {
136                    unscaled: bytes.clone(),
137                    precision: 39,
138                    scale: 1,
139                }),
140            ),
141            (
142                ScalarType::Numeric { max_scale: None },
143                Datum::from(Numeric::from(1)),
144                Value::Decimal(DecimalValue {
145                    // equivalent to 1E39
146                    unscaled: vec![
147                        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 240, 80, 254, 147, 137,
148                        67, 172, 196, 95, 101, 86, 128, 0, 0, 0, 0,
149                    ],
150                    precision: 81,
151                    scale: 39,
152                }),
153            ),
154            (
155                ScalarType::Bytes,
156                Datum::Bytes(&bytes),
157                Value::Bytes(bytes.clone()),
158            ),
159            (
160                ScalarType::String,
161                Datum::String(&string),
162                Value::String(string.clone()),
163            ),
164        ];
165        for (typ, datum, expected) in valid_pairings {
166            let desc = RelationDesc::builder()
167                .with_column("column1", typ.nullable(false))
168                .finish();
169            let schema_generator =
170                AvroSchemaGenerator::new(desc, false, Default::default(), "row", false, None, true)
171                    .unwrap();
172            let avro_value =
173                encode_datums_as_avro(std::iter::once(datum), schema_generator.columns());
174            assert_eq!(
175                Value::Record(vec![("column1".into(), expected)]),
176                avro_value
177            );
178        }
179
180        Ok(())
181    }
182}