1use std::collections::BTreeMap;
27use std::fmt;
28
29use chrono::NaiveDateTime;
30use enum_kinds::EnumKind;
31use itertools::Itertools;
32use serde_json::Value as JsonValue;
33
34use crate::schema::{RecordField, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
35
36#[derive(Clone, Debug, Eq, PartialEq)]
38pub struct SchemaResolutionError(pub String);
39
40impl SchemaResolutionError {
41 pub fn new<S>(msg: S) -> SchemaResolutionError
42 where
43 S: Into<String>,
44 {
45 SchemaResolutionError(msg.into())
46 }
47}
48
49impl fmt::Display for SchemaResolutionError {
50 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51 self.0.fmt(f)
52 }
53}
54
55impl std::error::Error for SchemaResolutionError {}
56
57#[derive(Clone, Debug, PartialEq)]
58pub struct DecimalValue {
59 pub unscaled: Vec<u8>,
61 pub precision: usize,
62 pub scale: usize,
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, EnumKind)] #[enum_kind(ScalarKind)]
67pub enum Scalar {
68 Null,
69 Boolean(bool),
70 Int(i32),
71 Long(i64),
72 Float(f32),
73 Double(f64),
74 Date(i32),
75 Timestamp(NaiveDateTime),
76}
77
78impl From<Scalar> for Value {
79 fn from(s: Scalar) -> Self {
80 match s {
81 Scalar::Null => Value::Null,
82 Scalar::Boolean(v) => Value::Boolean(v),
83 Scalar::Int(v) => Value::Int(v),
84 Scalar::Long(v) => Value::Long(v),
85 Scalar::Float(v) => Value::Float(v),
86 Scalar::Double(v) => Value::Double(v),
87 Scalar::Date(v) => Value::Date(v),
88 Scalar::Timestamp(v) => Value::Timestamp(v),
89 }
90 }
91}
92
93#[derive(Clone, Debug, PartialEq)]
97pub enum Value {
98 Null,
101 Boolean(bool),
103 Int(i32),
105 Long(i64),
107 Float(f32),
109 Double(f64),
111 Date(i32),
114 Timestamp(NaiveDateTime),
116
117 Decimal(DecimalValue),
123 Bytes(Vec<u8>),
125 String(String),
127 Fixed(usize, Vec<u8>),
130 Enum(usize, String),
137 Union {
139 index: usize,
141 inner: Box<Value>,
143 n_variants: usize,
146 null_variant: Option<usize>,
148 },
149 Array(Vec<Value>),
151 Map(BTreeMap<String, Value>),
153 Record(Vec<(String, Value)>),
160 Json(serde_json::Value),
165 Uuid(uuid::Uuid),
167}
168
169pub trait ToAvro {
172 fn avro(self) -> Value;
174}
175
176macro_rules! to_avro(
177 ($t:ty, $v:expr) => (
178 impl ToAvro for $t {
179 fn avro(self) -> Value {
180 $v(self)
181 }
182 }
183 );
184);
185
186to_avro!(bool, Value::Boolean);
187to_avro!(i32, Value::Int);
188to_avro!(i64, Value::Long);
189to_avro!(f32, Value::Float);
190to_avro!(f64, Value::Double);
191to_avro!(String, Value::String);
192
193impl ToAvro for () {
194 fn avro(self) -> Value {
195 Value::Null
196 }
197}
198
199impl ToAvro for usize {
200 fn avro(self) -> Value {
201 (self as i64).avro()
202 }
203}
204
205impl<'a> ToAvro for &'a str {
206 fn avro(self) -> Value {
207 Value::String(self.to_owned())
208 }
209}
210
211impl<'a> ToAvro for &'a [u8] {
212 fn avro(self) -> Value {
213 Value::Bytes(self.to_owned())
214 }
215}
216
217impl<T> ToAvro for BTreeMap<String, T>
218where
219 T: ToAvro,
220{
221 fn avro(self) -> Value {
222 Value::Map(
223 self.into_iter()
224 .map(|(key, value)| (key, value.avro()))
225 .collect::<_>(),
226 )
227 }
228}
229
230impl<'a, T> ToAvro for BTreeMap<&'a str, T>
231where
232 T: ToAvro,
233{
234 fn avro(self) -> Value {
235 Value::Map(
236 self.into_iter()
237 .map(|(key, value)| (key.to_owned(), value.avro()))
238 .collect::<_>(),
239 )
240 }
241}
242
243impl ToAvro for Value {
244 fn avro(self) -> Value {
245 self
246 }
247}
248
249impl Default for Value {
260 fn default() -> Self {
261 Value::Null
262 }
263}
264
265#[derive(Debug, Clone)]
267pub struct Record<'a> {
268 pub fields: Vec<(String, Value)>,
272 schema_lookup: &'a BTreeMap<String, usize>,
273 schema_fields: &'a Vec<RecordField>,
274}
275
276impl<'a> Record<'a> {
277 pub fn new(schema: SchemaNode<'a>) -> Option<Record<'a>> {
281 let ret = match schema.inner {
282 SchemaPiece::Record {
283 fields: schema_fields,
284 lookup: schema_lookup,
285 ..
286 } => {
287 let mut fields = Vec::with_capacity(schema_fields.len());
288 for schema_field in schema_fields.iter() {
289 fields.push((schema_field.name.clone(), Value::Null));
290 }
291
292 Some(Record {
293 fields,
294 schema_lookup,
295 schema_fields,
296 })
297 }
298 _ => None,
299 };
300 ret
301 }
302
303 pub fn put<V>(&mut self, field: &str, value: V)
309 where
310 V: ToAvro,
311 {
312 if let Some(&position) = self.schema_lookup.get(field) {
313 self.fields[position].1 = value.avro()
314 }
315 }
316
317 pub fn field_by_name(&self, name: &str) -> Option<&'a RecordField> {
319 self.schema_lookup
320 .get(name)
321 .map(|idx| &self.schema_fields[*idx])
322 }
323}
324
325impl<'a> ToAvro for Record<'a> {
326 fn avro(self) -> Value {
327 Value::Record(self.fields)
328 }
329}
330
331impl ToAvro for JsonValue {
332 fn avro(self) -> Value {
333 match self {
334 JsonValue::Null => Value::Null,
335 JsonValue::Bool(b) => Value::Boolean(b),
336 JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
337 JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
338 JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => Value::String(s),
340 JsonValue::Array(items) => {
341 Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
342 }
343 JsonValue::Object(items) => Value::Map(
344 items
345 .into_iter()
346 .map(|(key, value)| (key, value.avro()))
347 .collect::<_>(),
348 ),
349 }
350 }
351}
352
353impl Value {
354 pub fn validate(&self, schema: SchemaNode) -> bool {
359 match (self, schema.inner) {
360 (&Value::Null, SchemaPiece::Null) => true,
361 (&Value::Boolean(_), SchemaPiece::Boolean) => true,
362 (&Value::Int(_), SchemaPiece::Int) => true,
363 (&Value::Long(_), SchemaPiece::Long) => true,
364 (&Value::Float(_), SchemaPiece::Float) => true,
365 (&Value::Double(_), SchemaPiece::Double) => true,
366 (&Value::Date(_), SchemaPiece::Date) => true,
367 (&Value::Timestamp(_), SchemaPiece::TimestampMicro) => true,
368 (&Value::Timestamp(_), SchemaPiece::TimestampMilli) => true,
369 (
370 &Value::Decimal(DecimalValue {
371 precision: vp,
372 scale: vs,
373 ..
374 }),
375 SchemaPiece::Decimal {
376 precision: sp,
377 scale: ss,
378 fixed_size: _,
379 },
380 ) => vp == *sp && vs == *ss,
381 (&Value::Bytes(_), SchemaPiece::Bytes) => true,
382 (&Value::String(_), SchemaPiece::String) => true,
383 (&Value::Fixed(n, _), SchemaPiece::Fixed { size }) => n == *size,
384 (&Value::String(ref s), SchemaPiece::Enum { symbols, .. }) => symbols.contains(s),
385 (&Value::Enum(i, ref s), SchemaPiece::Enum { symbols, .. }) => {
386 symbols.get(i).map(|symbol| symbol == s).unwrap_or(false)
387 }
388 (
389 &Value::Union {
390 index,
391 ref inner,
392 n_variants,
393 null_variant,
394 },
395 SchemaPiece::Union(schema_inner),
396 ) => {
397 schema_inner.variants().len() > index
398 && n_variants == schema_inner.variants().len()
399 && inner.validate(schema.step(&schema_inner.variants()[index]))
400 && match null_variant {
401 None => !schema_inner
402 .variants()
403 .iter()
404 .any(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
405 Some(null_variant_idx) => {
406 schema_inner.variants().get(null_variant_idx)
407 == Some(&SchemaPieceOrNamed::Piece(SchemaPiece::Null))
408 }
409 }
410 }
411 (&Value::Array(ref items), SchemaPiece::Array(inner)) => {
412 let node = schema.step(&**inner);
413 items.iter().all(|item| item.validate(node))
414 }
415 (&Value::Map(ref items), SchemaPiece::Map(inner)) => {
416 let node = schema.step(&**inner);
417 items.iter().all(|(_, value)| value.validate(node))
418 }
419 (&Value::Record(ref record_fields), SchemaPiece::Record { fields, .. }) => {
420 fields.len() == record_fields.len()
421 && fields.iter().zip_eq(record_fields.iter()).all(
422 |(field, &(ref name, ref value))| {
423 let node = schema.step(&field.schema);
424 field.name == *name && value.validate(node)
425 },
426 )
427 }
428 (Value::Json(_), SchemaPiece::Json) => true,
429 (Value::Uuid(_), SchemaPiece::Uuid) => true,
430 _ => false,
431 }
432 }
433
434 pub fn into_string(self) -> Option<String> {
436 match self {
437 Value::String(s) => Some(s),
438 _ => None,
439 }
440 }
441
442 pub fn into_nullable_bool(self) -> Option<bool> {
443 match self {
444 Value::Boolean(b) => Some(b),
445 Value::Union { inner, .. } => inner.into_nullable_bool(),
446 _ => None,
447 }
448 }
449
450 pub fn into_integral(self) -> Option<i64> {
451 match self {
452 Value::Int(i) => Some(i as i64),
453 Value::Long(l) => Some(l),
454 _ => None,
455 }
456 }
457
458 pub fn into_usize(self) -> Option<usize> {
459 self.into_integral().and_then(|i| i.try_into().ok())
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use std::str::FromStr;
466
467 use crate::Schema;
468
469 use super::*;
470
471 #[mz_ore::test]
472 fn validate() {
473 let value_schema_valid = vec![
474 (Value::Int(42), "\"int\"", true),
475 (Value::Int(42), "\"boolean\"", false),
476 (
477 Value::Union {
478 index: 0,
479 inner: Box::new(Value::Null),
480 n_variants: 2,
481 null_variant: Some(0),
482 },
483 r#"["null", "int"]"#,
484 true,
485 ),
486 (
487 Value::Union {
488 index: 1,
489 inner: Box::new(Value::Int(42)),
490 n_variants: 2,
491 null_variant: Some(0),
492 },
493 r#"["null", "int"]"#,
494 true,
495 ),
496 (
497 Value::Union {
498 index: 1,
499 inner: Box::new(Value::Null),
500 n_variants: 2,
501 null_variant: Some(1),
502 },
503 r#"["double", "int"]"#,
504 false,
505 ),
506 (
507 Value::Union {
508 index: 3,
509 inner: Box::new(Value::Int(42)),
510 n_variants: 4,
511 null_variant: Some(0),
512 },
513 r#"["null", "double", "string", "int"]"#,
514 true,
515 ),
516 (
517 Value::Array(vec![Value::Long(42i64)]),
518 r#"{"type": "array", "items": "long"}"#,
519 true,
520 ),
521 (
522 Value::Array(vec![Value::Boolean(true)]),
523 r#"{"type": "array", "items": "long"}"#,
524 false,
525 ),
526 (Value::Record(vec![]), "\"null\"", false),
527 ];
528
529 for (value, schema_str, valid) in value_schema_valid.into_iter() {
530 let schema = Schema::from_str(schema_str).unwrap();
531 assert_eq!(
532 valid,
533 value.validate(schema.top_node()),
534 "Schema failed to validate against value: {} {:#?}",
535 schema_str,
536 value
537 );
538 }
539 }
540
541 #[mz_ore::test]
542 fn validate_fixed() {
543 let schema =
544 Schema::from_str(r#"{"type": "fixed", "size": 4, "name": "some_fixed"}"#).unwrap();
545
546 assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(schema.top_node()));
547 assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(schema.top_node()));
548 }
549
550 #[mz_ore::test]
551 fn validate_enum() {
552 let schema = Schema::from_str(r#"{"type": "enum", "name": "some_enum", "symbols": ["spades", "hearts", "diamonds", "clubs"]}"#).unwrap();
553
554 assert!(Value::Enum(0, "spades".to_string()).validate(schema.top_node()));
555 assert!(Value::String("spades".to_string()).validate(schema.top_node()));
556
557 assert!(!Value::Enum(1, "spades".to_string()).validate(schema.top_node()));
558 assert!(!Value::String("lorem".to_string()).validate(schema.top_node()));
559
560 let other_schema = Schema::from_str(r#"{"type": "enum", "name": "some_other_enum", "symbols": ["hearts", "diamonds", "clubs", "spades"]}"#).unwrap();
561
562 assert!(!Value::Enum(0, "spades".to_string()).validate(other_schema.top_node()));
563 }
564
565 #[mz_ore::test]
566 fn validate_record() {
567 let schema = Schema::from_str(
568 r#"{
569 "type": "record",
570 "fields": [
571 {"type": "long", "name": "a"},
572 {"type": "string", "name": "b"}
573 ],
574 "name": "some_record"
575 }"#,
576 )
577 .unwrap();
578
579 assert!(
580 Value::Record(vec![
581 ("a".to_string(), Value::Long(42i64)),
582 ("b".to_string(), Value::String("foo".to_string())),
583 ])
584 .validate(schema.top_node())
585 );
586
587 assert!(
588 !Value::Record(vec![
589 ("b".to_string(), Value::String("foo".to_string())),
590 ("a".to_string(), Value::Long(42i64)),
591 ])
592 .validate(schema.top_node())
593 );
594
595 assert!(
596 !Value::Record(vec![
597 ("a".to_string(), Value::Boolean(false)),
598 ("b".to_string(), Value::String("foo".to_string())),
599 ])
600 .validate(schema.top_node())
601 );
602
603 assert!(
604 !Value::Record(vec![
605 ("a".to_string(), Value::Long(42i64)),
606 ("c".to_string(), Value::String("foo".to_string())),
607 ])
608 .validate(schema.top_node())
609 );
610
611 assert!(
612 !Value::Record(vec![
613 ("a".to_string(), Value::Long(42i64)),
614 ("b".to_string(), Value::String("foo".to_string())),
615 ("c".to_string(), Value::Null),
616 ])
617 .validate(schema.top_node())
618 );
619 }
620
621 #[mz_ore::test]
622 fn validate_decimal() {
623 assert!(
624 Value::Decimal(DecimalValue {
625 unscaled: vec![7],
626 precision: 12,
627 scale: 5
628 })
629 .validate(
630 Schema::from_str(
631 r#"
632 {
633 "type": "bytes",
634 "logicalType": "decimal",
635 "precision": 12,
636 "scale": 5
637 }
638 "#
639 )
640 .unwrap()
641 .top_node()
642 )
643 );
644
645 assert!(
646 !Value::Decimal(DecimalValue {
647 unscaled: vec![7],
648 precision: 13,
649 scale: 5
650 })
651 .validate(
652 Schema::from_str(
653 r#"
654 {
655 "type": "bytes",
656 "logicalType": "decimal",
657 "precision": 12,
658 "scale": 5
659 }
660 "#
661 )
662 .unwrap()
663 .top_node()
664 )
665 );
666 }
667}