1use mz_avro::schema::{SchemaPiece, SchemaPieceOrNamed};
11
12mod decode;
13mod encode;
14mod schema;
15
16pub use crate::avro::decode::{Decoder, DiffPair};
17pub use crate::avro::encode::{
18 AvroEncoder, AvroSchemaGenerator, DocTarget, encode_datums_as_avro,
19 encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
20};
21pub use crate::avro::schema::{ConfluentAvroResolver, parse_schema, schema_to_relationdesc};
22
23fn is_null(schema: &SchemaPieceOrNamed) -> bool {
24 matches!(schema, SchemaPieceOrNamed::Piece(SchemaPiece::Null))
25}
26
27#[cfg(test)]
28mod tests {
29 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
30 use mz_avro::types::{DecimalValue, Value};
31 use mz_repr::adt::date::Date;
32 use mz_repr::adt::numeric::{self, NumericMaxScale};
33 use mz_repr::adt::timestamp::CheckedTimestamp;
34 use mz_repr::{Datum, RelationDesc, SqlScalarType};
35 use ordered_float::OrderedFloat;
36
37 use super::*;
38
39 #[mz_ore::test]
40 fn record_without_fields() -> anyhow::Result<()> {
41 let schema = r#"{
42 "type": "record",
43 "name": "test",
44 "fields": []
45 }"#;
46
47 let desc = schema_to_relationdesc(parse_schema(schema)?)?;
48 assert_eq!(desc.arity(), 0, "empty record produced rows");
49
50 Ok(())
51 }
52
53 #[mz_ore::test]
54 fn basic_record() -> anyhow::Result<()> {
55 let schema = r#"{
56 "type": "record",
57 "name": "test",
58 "fields": [
59 { "name": "f1", "type": "int" },
60 { "name": "f2", "type": "string" }
61 ]
62 }"#;
63
64 let desc = schema_to_relationdesc(parse_schema(schema)?)?;
65 let expected_desc = RelationDesc::builder()
66 .with_column("f1", SqlScalarType::Int32.nullable(false))
67 .with_column("f2", SqlScalarType::String.nullable(false))
68 .finish();
69
70 assert_eq!(desc, expected_desc);
71 Ok(())
72 }
73
74 #[mz_ore::test]
75 #[cfg_attr(miri, ignore)] fn test_diff_pair_to_avro_primitive_types() -> anyhow::Result<()> {
83 use numeric::Numeric;
84 let date = 365 * 50 + 20;
86 let date_time = NaiveDateTime::new(
87 NaiveDate::from_ymd_opt(2020, 1, 8).unwrap(),
88 NaiveTime::from_hms_opt(1, 1, 1).unwrap(),
89 );
90 let bytes: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10];
91 let string = String::from("test");
92
93 let valid_pairings = vec![
96 (SqlScalarType::Bool, Datum::True, Value::Boolean(true)),
97 (SqlScalarType::Bool, Datum::False, Value::Boolean(false)),
98 (SqlScalarType::Int32, Datum::Int32(1), Value::Int(1)),
99 (SqlScalarType::Int64, Datum::Int64(1), Value::Long(1)),
100 (
101 SqlScalarType::Float32,
102 Datum::Float32(OrderedFloat::from(1f32)),
103 Value::Float(1f32),
104 ),
105 (
106 SqlScalarType::Float64,
107 Datum::Float64(OrderedFloat::from(1f64)),
108 Value::Double(1f64),
109 ),
110 (
111 SqlScalarType::Date,
112 Datum::Date(Date::from_unix_epoch(date).unwrap()),
113 Value::Date(date),
114 ),
115 (
116 SqlScalarType::Timestamp { precision: None },
117 Datum::Timestamp(CheckedTimestamp::from_timestamplike(date_time).unwrap()),
118 Value::Timestamp(date_time),
119 ),
120 (
121 SqlScalarType::TimestampTz { precision: None },
122 Datum::TimestampTz(
123 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
124 date_time, Utc,
125 ))
126 .unwrap(),
127 ),
128 Value::Timestamp(date_time),
129 ),
130 (
131 SqlScalarType::Numeric {
132 max_scale: Some(NumericMaxScale::try_from(1_i64)?),
133 },
134 Datum::from(Numeric::from(1)),
135 Value::Decimal(DecimalValue {
136 unscaled: bytes.clone(),
137 precision: 39,
138 scale: 1,
139 }),
140 ),
141 (
142 SqlScalarType::Numeric { max_scale: None },
143 Datum::from(Numeric::from(1)),
144 Value::Decimal(DecimalValue {
145 unscaled: vec![
147 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 240, 80, 254, 147, 137,
148 67, 172, 196, 95, 101, 86, 128, 0, 0, 0, 0,
149 ],
150 precision: 81,
151 scale: 39,
152 }),
153 ),
154 (
155 SqlScalarType::Bytes,
156 Datum::Bytes(&bytes),
157 Value::Bytes(bytes.clone()),
158 ),
159 (
160 SqlScalarType::String,
161 Datum::String(&string),
162 Value::String(string.clone()),
163 ),
164 ];
165 for (typ, datum, expected) in valid_pairings {
166 let desc = RelationDesc::builder()
167 .with_column("column1", typ.nullable(false))
168 .finish();
169 let schema_generator =
170 AvroSchemaGenerator::new(desc, false, Default::default(), "row", false, None, true)
171 .unwrap();
172 let avro_value =
173 encode_datums_as_avro(std::iter::once(datum), schema_generator.columns());
174 assert_eq!(
175 Value::Record(vec![("column1".into(), expected)]),
176 avro_value
177 );
178 }
179
180 Ok(())
181 }
182}