Skip to main content

mz_interchange/
avro.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_avro::schema::{SchemaPiece, SchemaPieceOrNamed};
11
12mod decode;
13mod encode;
14mod schema;
15
16pub use crate::avro::decode::{Decoder, DiffPair};
17pub use crate::avro::encode::{
18    AvroEncoder, AvroSchemaGenerator, DocTarget, encode_datums_as_avro,
19    encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
20};
21pub use crate::avro::schema::{
22    AvroSchemaResolver, WriterSchemaKey, WriterSchemaProvider, parse_schema, schema_to_relationdesc,
23};
24
25fn is_null(schema: &SchemaPieceOrNamed) -> bool {
26    matches!(schema, SchemaPieceOrNamed::Piece(SchemaPiece::Null))
27}
28
29#[cfg(test)]
30mod tests {
31    use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
32    use mz_avro::types::{DecimalValue, Value};
33    use mz_repr::adt::date::Date;
34    use mz_repr::adt::numeric::{self, NumericMaxScale};
35    use mz_repr::adt::timestamp::CheckedTimestamp;
36    use mz_repr::{Datum, RelationDesc, SqlScalarType};
37    use ordered_float::OrderedFloat;
38
39    use super::*;
40
41    #[mz_ore::test]
42    fn record_without_fields() -> anyhow::Result<()> {
43        let schema = r#"{
44            "type": "record",
45            "name": "test",
46            "fields": []
47        }"#;
48
49        let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?;
50        assert_eq!(desc.arity(), 0, "empty record produced rows");
51
52        Ok(())
53    }
54
55    #[mz_ore::test]
56    fn basic_record() -> anyhow::Result<()> {
57        let schema = r#"{
58            "type": "record",
59            "name": "test",
60            "fields": [
61                { "name": "f1", "type": "int" },
62                { "name": "f2", "type": "string" }
63            ]
64        }"#;
65
66        let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?;
67        let expected_desc = RelationDesc::builder()
68            .with_column("f1", SqlScalarType::Int32.nullable(false))
69            .with_column("f2", SqlScalarType::String.nullable(false))
70            .finish();
71
72        assert_eq!(desc, expected_desc);
73        Ok(())
74    }
75
76    #[mz_ore::test]
77    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
78    /// Test that primitive Avro Schema types are allow Datums to be correctly
79    /// serialized into Avro Values.
80    ///
81    /// Complete list of primitive types in test, also found in this
82    /// documentation:
83    /// https://avro.apache.org/docs/++version++/specification/#primitive-types
84    fn test_diff_pair_to_avro_primitive_types() -> anyhow::Result<()> {
85        use numeric::Numeric;
86        // Data to be used later in assertions.
87        let date = 365 * 50 + 20;
88        let date_time = NaiveDateTime::new(
89            NaiveDate::from_ymd_opt(2020, 1, 8).unwrap(),
90            NaiveTime::from_hms_opt(1, 1, 1).unwrap(),
91        );
92        let bytes: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10];
93        let string = String::from("test");
94
95        // Simple transformations from primitive Avro Schema types
96        // to Avro Values.
97        let valid_pairings = vec![
98            (SqlScalarType::Bool, Datum::True, Value::Boolean(true)),
99            (SqlScalarType::Bool, Datum::False, Value::Boolean(false)),
100            (SqlScalarType::Int32, Datum::Int32(1), Value::Int(1)),
101            (SqlScalarType::Int64, Datum::Int64(1), Value::Long(1)),
102            (
103                SqlScalarType::Float32,
104                Datum::Float32(OrderedFloat::from(1f32)),
105                Value::Float(1f32),
106            ),
107            (
108                SqlScalarType::Float64,
109                Datum::Float64(OrderedFloat::from(1f64)),
110                Value::Double(1f64),
111            ),
112            (
113                SqlScalarType::Date,
114                Datum::Date(Date::from_unix_epoch(date).unwrap()),
115                Value::Date(date),
116            ),
117            (
118                SqlScalarType::Timestamp { precision: None },
119                Datum::Timestamp(CheckedTimestamp::from_timestamplike(date_time).unwrap()),
120                Value::Timestamp(date_time),
121            ),
122            (
123                SqlScalarType::TimestampTz { precision: None },
124                Datum::TimestampTz(
125                    CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
126                        date_time, Utc,
127                    ))
128                    .unwrap(),
129                ),
130                Value::Timestamp(date_time),
131            ),
132            (
133                SqlScalarType::Numeric {
134                    max_scale: Some(NumericMaxScale::try_from(1_i64)?),
135                },
136                Datum::from(Numeric::from(1)),
137                Value::Decimal(DecimalValue {
138                    unscaled: bytes.clone(),
139                    precision: 39,
140                    scale: 1,
141                }),
142            ),
143            (
144                SqlScalarType::Numeric { max_scale: None },
145                Datum::from(Numeric::from(1)),
146                Value::Decimal(DecimalValue {
147                    // equivalent to 1E39
148                    unscaled: vec![
149                        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 240, 80, 254, 147, 137,
150                        67, 172, 196, 95, 101, 86, 128, 0, 0, 0, 0,
151                    ],
152                    precision: 81,
153                    scale: 39,
154                }),
155            ),
156            (
157                SqlScalarType::Bytes,
158                Datum::Bytes(&bytes),
159                Value::Bytes(bytes.clone()),
160            ),
161            (
162                SqlScalarType::String,
163                Datum::String(&string),
164                Value::String(string.clone()),
165            ),
166        ];
167        for (typ, datum, expected) in valid_pairings {
168            let desc = RelationDesc::builder()
169                .with_column("column1", typ.nullable(false))
170                .finish();
171            let schema_generator =
172                AvroSchemaGenerator::new(desc, false, Default::default(), "row", false, None, true)
173                    .unwrap();
174            let avro_value =
175                encode_datums_as_avro(std::iter::once(datum), schema_generator.columns());
176            assert_eq!(
177                Value::Record(vec![("column1".into(), expected)]),
178                avro_value
179            );
180        }
181
182        Ok(())
183    }
184}