1use mz_avro::schema::{SchemaPiece, SchemaPieceOrNamed};
11
12mod decode;
13mod encode;
14mod schema;
15
16pub use crate::avro::decode::{Decoder, DiffPair};
17pub use crate::avro::encode::{
18 AvroEncoder, AvroSchemaGenerator, DocTarget, encode_datums_as_avro,
19 encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
20};
21pub use crate::avro::schema::{
22 AvroSchemaResolver, WriterSchemaKey, WriterSchemaProvider, parse_schema, schema_to_relationdesc,
23};
24
25fn is_null(schema: &SchemaPieceOrNamed) -> bool {
26 matches!(schema, SchemaPieceOrNamed::Piece(SchemaPiece::Null))
27}
28
29#[cfg(test)]
30mod tests {
31 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
32 use mz_avro::types::{DecimalValue, Value};
33 use mz_repr::adt::date::Date;
34 use mz_repr::adt::numeric::{self, NumericMaxScale};
35 use mz_repr::adt::timestamp::CheckedTimestamp;
36 use mz_repr::{Datum, RelationDesc, SqlScalarType};
37 use ordered_float::OrderedFloat;
38
39 use super::*;
40
41 #[mz_ore::test]
42 fn record_without_fields() -> anyhow::Result<()> {
43 let schema = r#"{
44 "type": "record",
45 "name": "test",
46 "fields": []
47 }"#;
48
49 let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?;
50 assert_eq!(desc.arity(), 0, "empty record produced rows");
51
52 Ok(())
53 }
54
55 #[mz_ore::test]
56 fn basic_record() -> anyhow::Result<()> {
57 let schema = r#"{
58 "type": "record",
59 "name": "test",
60 "fields": [
61 { "name": "f1", "type": "int" },
62 { "name": "f2", "type": "string" }
63 ]
64 }"#;
65
66 let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?;
67 let expected_desc = RelationDesc::builder()
68 .with_column("f1", SqlScalarType::Int32.nullable(false))
69 .with_column("f2", SqlScalarType::String.nullable(false))
70 .finish();
71
72 assert_eq!(desc, expected_desc);
73 Ok(())
74 }
75
76 #[mz_ore::test]
77 #[cfg_attr(miri, ignore)] fn test_diff_pair_to_avro_primitive_types() -> anyhow::Result<()> {
85 use numeric::Numeric;
86 let date = 365 * 50 + 20;
88 let date_time = NaiveDateTime::new(
89 NaiveDate::from_ymd_opt(2020, 1, 8).unwrap(),
90 NaiveTime::from_hms_opt(1, 1, 1).unwrap(),
91 );
92 let bytes: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10];
93 let string = String::from("test");
94
95 let valid_pairings = vec![
98 (SqlScalarType::Bool, Datum::True, Value::Boolean(true)),
99 (SqlScalarType::Bool, Datum::False, Value::Boolean(false)),
100 (SqlScalarType::Int32, Datum::Int32(1), Value::Int(1)),
101 (SqlScalarType::Int64, Datum::Int64(1), Value::Long(1)),
102 (
103 SqlScalarType::Float32,
104 Datum::Float32(OrderedFloat::from(1f32)),
105 Value::Float(1f32),
106 ),
107 (
108 SqlScalarType::Float64,
109 Datum::Float64(OrderedFloat::from(1f64)),
110 Value::Double(1f64),
111 ),
112 (
113 SqlScalarType::Date,
114 Datum::Date(Date::from_unix_epoch(date).unwrap()),
115 Value::Date(date),
116 ),
117 (
118 SqlScalarType::Timestamp { precision: None },
119 Datum::Timestamp(CheckedTimestamp::from_timestamplike(date_time).unwrap()),
120 Value::Timestamp(date_time),
121 ),
122 (
123 SqlScalarType::TimestampTz { precision: None },
124 Datum::TimestampTz(
125 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
126 date_time, Utc,
127 ))
128 .unwrap(),
129 ),
130 Value::Timestamp(date_time),
131 ),
132 (
133 SqlScalarType::Numeric {
134 max_scale: Some(NumericMaxScale::try_from(1_i64)?),
135 },
136 Datum::from(Numeric::from(1)),
137 Value::Decimal(DecimalValue {
138 unscaled: bytes.clone(),
139 precision: 39,
140 scale: 1,
141 }),
142 ),
143 (
144 SqlScalarType::Numeric { max_scale: None },
145 Datum::from(Numeric::from(1)),
146 Value::Decimal(DecimalValue {
147 unscaled: vec![
149 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 240, 80, 254, 147, 137,
150 67, 172, 196, 95, 101, 86, 128, 0, 0, 0, 0,
151 ],
152 precision: 81,
153 scale: 39,
154 }),
155 ),
156 (
157 SqlScalarType::Bytes,
158 Datum::Bytes(&bytes),
159 Value::Bytes(bytes.clone()),
160 ),
161 (
162 SqlScalarType::String,
163 Datum::String(&string),
164 Value::String(string.clone()),
165 ),
166 ];
167 for (typ, datum, expected) in valid_pairings {
168 let desc = RelationDesc::builder()
169 .with_column("column1", typ.nullable(false))
170 .finish();
171 let schema_generator =
172 AvroSchemaGenerator::new(desc, false, Default::default(), "row", false, None, true)
173 .unwrap();
174 let avro_value =
175 encode_datums_as_avro(std::iter::once(datum), schema_generator.columns());
176 assert_eq!(
177 Value::Record(vec![("column1".into(), expected)]),
178 avro_value
179 );
180 }
181
182 Ok(())
183 }
184}