1use std::collections::BTreeMap;
21use std::convert::{TryFrom, TryInto};
22use std::fmt::Debug;
23
24use anyhow::{Context, anyhow, bail};
25use byteorder::{BigEndian, ByteOrder};
26use chrono::NaiveDate;
27pub use mz_avro::schema::{Schema, SchemaKind, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
30pub use mz_avro::types::{DecimalValue, ToAvro, Value};
31pub use mz_avro::{from_avro_datum, to_avro_datum};
32pub use mz_interchange::avro::parse_schema;
33use serde_json::Value as JsonValue;
34
35pub fn from_json(json: &JsonValue, schema: SchemaNode) -> Result<Value, anyhow::Error> {
38 match (&json, &schema.inner) {
39 (JsonValue::Null, SchemaPiece::Null) => Ok(Value::Null),
40 (JsonValue::Bool(b), SchemaPiece::Boolean) => Ok(Value::Boolean(*b)),
41 (JsonValue::Number(n), SchemaPiece::Int) => Ok(Value::Int(n.as_i64().unwrap().try_into()?)),
42 (JsonValue::Number(n), SchemaPiece::Long) => Ok(Value::Long(n.as_i64().unwrap())),
43 (JsonValue::Number(n), SchemaPiece::Float) => {
44 #[allow(clippy::as_conversions)]
46 Ok(Value::Float(n.as_f64().unwrap() as f32))
47 }
48 (JsonValue::Number(n), SchemaPiece::Double) => Ok(Value::Double(n.as_f64().unwrap())),
49 (JsonValue::Number(n), SchemaPiece::Date) => {
50 Ok(Value::Date(i32::try_from(n.as_i64().unwrap())?))
51 }
52 (JsonValue::Number(n), SchemaPiece::TimestampMilli) => {
53 let ts = n.as_i64().unwrap();
54 Ok(Value::Timestamp(
55 chrono::DateTime::from_timestamp_millis(ts)
56 .ok_or_else(|| anyhow!("timestamp out of bounds"))?
57 .naive_utc(),
58 ))
59 }
60 (JsonValue::Number(n), SchemaPiece::TimestampMicro) => {
61 let ts = n.as_i64().unwrap();
62 Ok(Value::Timestamp(
63 chrono::DateTime::from_timestamp_micros(ts)
64 .ok_or_else(|| anyhow!("timestamp out of bounds"))?
65 .naive_utc(),
66 ))
67 }
68 (JsonValue::Array(items), SchemaPiece::Array(inner)) => Ok(Value::Array(
69 items
70 .iter()
71 .map(|x| from_json(x, schema.step(&**inner)))
72 .collect::<Result<_, _>>()?,
73 )),
74 (JsonValue::String(s), SchemaPiece::String) => Ok(Value::String(s.clone())),
75 (
76 JsonValue::Array(items),
77 SchemaPiece::Decimal {
78 precision, scale, ..
79 },
80 ) => {
81 let bytes = match items
82 .iter()
83 .map(|x| x.as_i64().and_then(|x| u8::try_from(x).ok()))
84 .collect::<Option<Vec<u8>>>()
85 {
86 Some(bytes) => bytes,
87 None => bail!("decimal was not represented by byte array"),
88 };
89 Ok(Value::Decimal(DecimalValue {
90 unscaled: bytes,
91 precision: *precision,
92 scale: *scale,
93 }))
94 }
95 (JsonValue::Array(items), SchemaPiece::Fixed { size }) => {
96 let bytes = match items
97 .iter()
98 .map(|x| x.as_i64().and_then(|x| u8::try_from(x).ok()))
99 .collect::<Option<Vec<u8>>>()
100 {
101 Some(bytes) => bytes,
102 None => bail!("fixed was not represented by byte array"),
103 };
104 if *size != bytes.len() {
105 bail!("expected fixed size {}, got {}", *size, bytes.len())
106 } else {
107 Ok(Value::Fixed(*size, bytes))
108 }
109 }
110 (JsonValue::String(s), SchemaPiece::Json) => {
111 let j = serde_json::from_str(s)?;
112 Ok(Value::Json(j))
113 }
114 (JsonValue::String(s), SchemaPiece::Uuid) => {
115 let u = uuid::Uuid::parse_str(s)?;
116 Ok(Value::Uuid(u))
117 }
118 (JsonValue::String(s), SchemaPiece::Enum { symbols, .. }) => {
119 if symbols.contains(s) {
120 Ok(Value::String(s.clone()))
121 } else {
122 bail!("Unrecognized enum variant: {}", s)
123 }
124 }
125 (JsonValue::Object(items), SchemaPiece::Record { .. }) => {
126 let mut builder = mz_avro::types::Record::new(schema)
127 .expect("`Record::new` should never fail if schema piece is a record!");
128 for (key, val) in items {
129 let field = builder
130 .field_by_name(key)
131 .ok_or_else(|| anyhow!("No such field {} in record: {}", key, val))?;
132 let val = from_json(val, schema.step(&field.schema))?;
133 builder.put(key, val);
134 }
135 Ok(builder.avro())
136 }
137 (JsonValue::Object(items), SchemaPiece::Map(m)) => {
138 let mut map = BTreeMap::new();
139 for (k, v) in items {
140 let (inner, name) = m.get_piece_and_name(schema.root);
141 map.insert(
142 k.to_owned(),
143 from_json(
144 v,
145 SchemaNode {
146 root: schema.root,
147 inner,
148 name,
149 },
150 )?,
151 );
152 }
153 Ok(Value::Map(map))
154 }
155 (val, SchemaPiece::Union(us)) => {
156 let variants = us.variants();
157 let null_variant = variants
158 .iter()
159 .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
160 if let JsonValue::Null = val {
161 return if let Some(nv) = null_variant {
162 Ok(Value::Union {
163 index: nv,
164 inner: Box::new(Value::Null),
165 n_variants: variants.len(),
166 null_variant,
167 })
168 } else {
169 bail!("No `null` value in union schema.")
170 };
171 }
172 let items = match val {
173 JsonValue::Object(items) => items,
174 _ => bail!(
175 "Union schema element must be `null` or a map from type name to value; found {:?}",
176 val
177 ),
178 };
179 let (name, val) = if items.len() == 1 {
180 (items.keys().next().unwrap(), items.values().next().unwrap())
181 } else {
182 bail!(
183 "Expected one-element object to match union schema: {:?} vs {:?}",
184 json,
185 schema
186 );
187 };
188 for (i, variant) in variants.iter().enumerate() {
189 let name_matches = match variant {
190 SchemaPieceOrNamed::Piece(piece) => SchemaKind::from(piece).name() == name,
191 SchemaPieceOrNamed::Named(idx) => {
192 let schema_name = &schema.root.lookup(*idx).name;
193 if name.chars().any(|ch| ch == '.') {
194 name == &format!(
195 "{}.{}",
196 schema_name.namespace(),
197 schema_name.base_name()
198 )
199 } else {
200 name == schema_name.base_name()
201 }
202 }
203 };
204 if name_matches {
205 match from_json(val, schema.step(variant)) {
206 Ok(avro) => {
207 return Ok(Value::Union {
208 index: i,
209 inner: Box::new(avro),
210 n_variants: variants.len(),
211 null_variant,
212 });
213 }
214 Err(msg) => return Err(msg),
215 }
216 }
217 }
218 bail!(
219 "Type not found in union: {}. variants: {:#?}",
220 name,
221 variants
222 )
223 }
224 _ => bail!(
225 "unable to match JSON value to schema: {:?} vs {:?}",
226 json,
227 schema
228 ),
229 }
230}
231
232pub fn from_confluent_bytes(schema: &Schema, mut bytes: &[u8]) -> Result<Value, anyhow::Error> {
238 if bytes.len() < 5 {
239 bail!(
240 "avro datum is too few bytes: expected at least 5 bytes, got {}",
241 bytes.len()
242 );
243 }
244 let magic = bytes[0];
245 let _schema_id = BigEndian::read_i32(&bytes[1..5]);
246 bytes = &bytes[5..];
247
248 if magic != 0 {
249 bail!(
250 "wrong avro serialization magic: expected 0, got {}",
251 bytes[0]
252 );
253 }
254
255 let datum = from_avro_datum(schema, &mut bytes).context("decoding avro datum")?;
256 Ok(datum)
257}
258
259#[derive(Clone)]
266pub struct DebugValue(pub Value);
267
268impl Debug for DebugValue {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 match &self.0 {
271 Value::Timestamp(t) => write!(
272 f,
273 "Timestamp(\"{:?}\", {} micros, {} millis)",
274 t,
275 t.and_utc().timestamp_micros(),
276 t.and_utc().timestamp_millis()
277 ),
278 Value::Date(d) => write!(
279 f,
280 "Date({:?}, \"{}\")",
281 d,
282 NaiveDate::from_num_days_from_ce_opt(*d).unwrap()
283 ),
284
285 Value::Record(r) => f
287 .debug_set()
288 .entries(r.iter().map(|(s, v)| (s, DebugValue(v.clone()))))
289 .finish(),
290 Value::Array(a) => f
291 .debug_set()
292 .entries(a.iter().map(|v| DebugValue(v.clone())))
293 .finish(),
294 Value::Union {
295 index,
296 inner,
297 n_variants,
298 null_variant,
299 } => f
300 .debug_struct("Union")
301 .field("index", index)
302 .field("inner", &DebugValue(*inner.clone()))
303 .field("n_variants", n_variants)
304 .field("null_variant", null_variant)
305 .finish(),
306
307 _ => write!(f, "{:?}", self.0),
308 }
309 }
310}