1use mz_avro::schema::{SchemaPiece, SchemaPieceOrNamed};
11
12mod decode;
13mod encode;
14mod schema;
15
16pub use crate::avro::decode::{Decoder, DiffPair};
17pub use crate::avro::encode::{
18 AvroEncoder, AvroSchemaGenerator, DocTarget, encode_datums_as_avro,
19 encode_debezium_transaction_unchecked, get_debezium_transaction_schema,
20};
21pub use crate::avro::schema::{ConfluentAvroResolver, parse_schema, schema_to_relationdesc};
22
23fn is_null(schema: &SchemaPieceOrNamed) -> bool {
24 matches!(schema, SchemaPieceOrNamed::Piece(SchemaPiece::Null))
25}
26
27#[cfg(test)]
28mod tests {
29 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
30 use mz_avro::types::{DecimalValue, Value};
31 use mz_repr::adt::date::Date;
32 use mz_repr::adt::numeric::{self, NumericMaxScale};
33 use mz_repr::adt::timestamp::CheckedTimestamp;
34 use mz_repr::{Datum, RelationDesc, ScalarType};
35 use ordered_float::OrderedFloat;
36
37 use super::*;
38
39 #[mz_ore::test]
40 fn record_without_fields() -> anyhow::Result<()> {
41 let schema = r#"{
42 "type": "record",
43 "name": "test",
44 "fields": []
45 }"#;
46
47 let desc = schema_to_relationdesc(parse_schema(schema)?)?;
48 assert_eq!(desc.arity(), 0, "empty record produced rows");
49
50 Ok(())
51 }
52
53 #[mz_ore::test]
54 fn basic_record() -> anyhow::Result<()> {
55 let schema = r#"{
56 "type": "record",
57 "name": "test",
58 "fields": [
59 { "name": "f1", "type": "int" },
60 { "name": "f2", "type": "string" }
61 ]
62 }"#;
63
64 let desc = schema_to_relationdesc(parse_schema(schema)?)?;
65 let expected_desc = RelationDesc::builder()
66 .with_column("f1", ScalarType::Int32.nullable(false))
67 .with_column("f2", ScalarType::String.nullable(false))
68 .finish();
69
70 assert_eq!(desc, expected_desc);
71 Ok(())
72 }
73
74 #[mz_ore::test]
75 #[cfg_attr(miri, ignore)] fn test_diff_pair_to_avro_primitive_types() -> anyhow::Result<()> {
83 use numeric::Numeric;
84 let date = 365 * 50 + 20;
86 let date_time = NaiveDateTime::new(
87 NaiveDate::from_ymd_opt(2020, 1, 8).unwrap(),
88 NaiveTime::from_hms_opt(1, 1, 1).unwrap(),
89 );
90 let bytes: Vec<u8> = vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10];
91 let string = String::from("test");
92
93 let valid_pairings = vec![
96 (ScalarType::Bool, Datum::True, Value::Boolean(true)),
97 (ScalarType::Bool, Datum::False, Value::Boolean(false)),
98 (ScalarType::Int32, Datum::Int32(1), Value::Int(1)),
99 (ScalarType::Int64, Datum::Int64(1), Value::Long(1)),
100 (
101 ScalarType::Float32,
102 Datum::Float32(OrderedFloat::from(1f32)),
103 Value::Float(1f32),
104 ),
105 (
106 ScalarType::Float64,
107 Datum::Float64(OrderedFloat::from(1f64)),
108 Value::Double(1f64),
109 ),
110 (
111 ScalarType::Date,
112 Datum::Date(Date::from_unix_epoch(date).unwrap()),
113 Value::Date(date),
114 ),
115 (
116 ScalarType::Timestamp { precision: None },
117 Datum::Timestamp(CheckedTimestamp::from_timestamplike(date_time).unwrap()),
118 Value::Timestamp(date_time),
119 ),
120 (
121 ScalarType::TimestampTz { precision: None },
122 Datum::TimestampTz(
123 CheckedTimestamp::from_timestamplike(DateTime::from_naive_utc_and_offset(
124 date_time, Utc,
125 ))
126 .unwrap(),
127 ),
128 Value::Timestamp(date_time),
129 ),
130 (
131 ScalarType::Numeric {
132 max_scale: Some(NumericMaxScale::try_from(1_i64)?),
133 },
134 Datum::from(Numeric::from(1)),
135 Value::Decimal(DecimalValue {
136 unscaled: bytes.clone(),
137 precision: 39,
138 scale: 1,
139 }),
140 ),
141 (
142 ScalarType::Numeric { max_scale: None },
143 Datum::from(Numeric::from(1)),
144 Value::Decimal(DecimalValue {
145 unscaled: vec![
147 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 240, 80, 254, 147, 137,
148 67, 172, 196, 95, 101, 86, 128, 0, 0, 0, 0,
149 ],
150 precision: 81,
151 scale: 39,
152 }),
153 ),
154 (
155 ScalarType::Bytes,
156 Datum::Bytes(&bytes),
157 Value::Bytes(bytes.clone()),
158 ),
159 (
160 ScalarType::String,
161 Datum::String(&string),
162 Value::String(string.clone()),
163 ),
164 ];
165 for (typ, datum, expected) in valid_pairings {
166 let desc = RelationDesc::builder()
167 .with_column("column1", typ.nullable(false))
168 .finish();
169 let schema_generator =
170 AvroSchemaGenerator::new(desc, false, Default::default(), "row", false, None, true)
171 .unwrap();
172 let avro_value =
173 encode_datums_as_avro(std::iter::once(datum), schema_generator.columns());
174 assert_eq!(
175 Value::Record(vec![("column1".into(), expected)]),
176 avro_value
177 );
178 }
179
180 Ok(())
181 }
182}