1use std::collections::BTreeMap;
27use std::fmt;
28
29use chrono::NaiveDateTime;
30use enum_kinds::EnumKind;
31use itertools::Itertools;
32use serde_json::Value as JsonValue;
33
34use crate::schema::{RecordField, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
35
36#[derive(Clone, Debug, Eq, PartialEq)]
38pub struct SchemaResolutionError(pub String);
39
40impl SchemaResolutionError {
41    pub fn new<S>(msg: S) -> SchemaResolutionError
42    where
43        S: Into<String>,
44    {
45        SchemaResolutionError(msg.into())
46    }
47}
48
49impl fmt::Display for SchemaResolutionError {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        self.0.fmt(f)
52    }
53}
54
55impl std::error::Error for SchemaResolutionError {}
56
57#[derive(Clone, Debug, PartialEq)]
58pub struct DecimalValue {
59    pub unscaled: Vec<u8>,
61    pub precision: usize,
62    pub scale: usize,
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, EnumKind)] #[enum_kind(ScalarKind)]
67pub enum Scalar {
68    Null,
69    Boolean(bool),
70    Int(i32),
71    Long(i64),
72    Float(f32),
73    Double(f64),
74    Date(i32),
75    Timestamp(NaiveDateTime),
76}
77
78impl From<Scalar> for Value {
79    fn from(s: Scalar) -> Self {
80        match s {
81            Scalar::Null => Value::Null,
82            Scalar::Boolean(v) => Value::Boolean(v),
83            Scalar::Int(v) => Value::Int(v),
84            Scalar::Long(v) => Value::Long(v),
85            Scalar::Float(v) => Value::Float(v),
86            Scalar::Double(v) => Value::Double(v),
87            Scalar::Date(v) => Value::Date(v),
88            Scalar::Timestamp(v) => Value::Timestamp(v),
89        }
90    }
91}
92
93#[derive(Clone, Debug, PartialEq)]
97pub enum Value {
98    Null,
101    Boolean(bool),
103    Int(i32),
105    Long(i64),
107    Float(f32),
109    Double(f64),
111    Date(i32),
114    Timestamp(NaiveDateTime),
116
117    Decimal(DecimalValue),
123    Bytes(Vec<u8>),
125    String(String),
127    Fixed(usize, Vec<u8>),
130    Enum(usize, String),
137    Union {
139        index: usize,
141        inner: Box<Value>,
143        n_variants: usize,
146        null_variant: Option<usize>,
148    },
149    Array(Vec<Value>),
151    Map(BTreeMap<String, Value>),
153    Record(Vec<(String, Value)>),
160    Json(serde_json::Value),
165    Uuid(uuid::Uuid),
167}
168
169pub trait ToAvro {
172    fn avro(self) -> Value;
174}
175
176macro_rules! to_avro(
177    ($t:ty, $v:expr) => (
178        impl ToAvro for $t {
179            fn avro(self) -> Value {
180                $v(self)
181            }
182        }
183    );
184);
185
186to_avro!(bool, Value::Boolean);
187to_avro!(i32, Value::Int);
188to_avro!(i64, Value::Long);
189to_avro!(f32, Value::Float);
190to_avro!(f64, Value::Double);
191to_avro!(String, Value::String);
192
193impl ToAvro for () {
194    fn avro(self) -> Value {
195        Value::Null
196    }
197}
198
199impl ToAvro for usize {
200    fn avro(self) -> Value {
201        (self as i64).avro()
202    }
203}
204
205impl<'a> ToAvro for &'a str {
206    fn avro(self) -> Value {
207        Value::String(self.to_owned())
208    }
209}
210
211impl<'a> ToAvro for &'a [u8] {
212    fn avro(self) -> Value {
213        Value::Bytes(self.to_owned())
214    }
215}
216
217impl<T> ToAvro for BTreeMap<String, T>
218where
219    T: ToAvro,
220{
221    fn avro(self) -> Value {
222        Value::Map(
223            self.into_iter()
224                .map(|(key, value)| (key, value.avro()))
225                .collect::<_>(),
226        )
227    }
228}
229
230impl<'a, T> ToAvro for BTreeMap<&'a str, T>
231where
232    T: ToAvro,
233{
234    fn avro(self) -> Value {
235        Value::Map(
236            self.into_iter()
237                .map(|(key, value)| (key.to_owned(), value.avro()))
238                .collect::<_>(),
239        )
240    }
241}
242
243impl ToAvro for Value {
244    fn avro(self) -> Value {
245        self
246    }
247}
248
249impl Default for Value {
260    fn default() -> Self {
261        Value::Null
262    }
263}
264
265#[derive(Debug, Clone)]
267pub struct Record<'a> {
268    pub fields: Vec<(String, Value)>,
272    schema_lookup: &'a BTreeMap<String, usize>,
273    schema_fields: &'a Vec<RecordField>,
274}
275
276impl<'a> Record<'a> {
277    pub fn new(schema: SchemaNode<'a>) -> Option<Record<'a>> {
281        let ret = match schema.inner {
282            SchemaPiece::Record {
283                fields: schema_fields,
284                lookup: schema_lookup,
285                ..
286            } => {
287                let mut fields = Vec::with_capacity(schema_fields.len());
288                for schema_field in schema_fields.iter() {
289                    fields.push((schema_field.name.clone(), Value::Null));
290                }
291
292                Some(Record {
293                    fields,
294                    schema_lookup,
295                    schema_fields,
296                })
297            }
298            _ => None,
299        };
300        ret
301    }
302
303    pub fn put<V>(&mut self, field: &str, value: V)
309    where
310        V: ToAvro,
311    {
312        if let Some(&position) = self.schema_lookup.get(field) {
313            self.fields[position].1 = value.avro()
314        }
315    }
316
317    pub fn field_by_name(&self, name: &str) -> Option<&'a RecordField> {
319        self.schema_lookup
320            .get(name)
321            .map(|idx| &self.schema_fields[*idx])
322    }
323}
324
325impl<'a> ToAvro for Record<'a> {
326    fn avro(self) -> Value {
327        Value::Record(self.fields)
328    }
329}
330
331impl ToAvro for JsonValue {
332    fn avro(self) -> Value {
333        match self {
334            JsonValue::Null => Value::Null,
335            JsonValue::Bool(b) => Value::Boolean(b),
336            JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
337            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
338            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), JsonValue::String(s) => Value::String(s),
340            JsonValue::Array(items) => {
341                Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
342            }
343            JsonValue::Object(items) => Value::Map(
344                items
345                    .into_iter()
346                    .map(|(key, value)| (key, value.avro()))
347                    .collect::<_>(),
348            ),
349        }
350    }
351}
352
353impl Value {
354    pub fn validate(&self, schema: SchemaNode) -> bool {
359        match (self, schema.inner) {
360            (&Value::Null, SchemaPiece::Null) => true,
361            (&Value::Boolean(_), SchemaPiece::Boolean) => true,
362            (&Value::Int(_), SchemaPiece::Int) => true,
363            (&Value::Long(_), SchemaPiece::Long) => true,
364            (&Value::Float(_), SchemaPiece::Float) => true,
365            (&Value::Double(_), SchemaPiece::Double) => true,
366            (&Value::Date(_), SchemaPiece::Date) => true,
367            (&Value::Timestamp(_), SchemaPiece::TimestampMicro) => true,
368            (&Value::Timestamp(_), SchemaPiece::TimestampMilli) => true,
369            (
370                &Value::Decimal(DecimalValue {
371                    precision: vp,
372                    scale: vs,
373                    ..
374                }),
375                SchemaPiece::Decimal {
376                    precision: sp,
377                    scale: ss,
378                    fixed_size: _,
379                },
380            ) => vp == *sp && vs == *ss,
381            (&Value::Bytes(_), SchemaPiece::Bytes) => true,
382            (&Value::String(_), SchemaPiece::String) => true,
383            (&Value::Fixed(n, _), SchemaPiece::Fixed { size }) => n == *size,
384            (&Value::String(ref s), SchemaPiece::Enum { symbols, .. }) => symbols.contains(s),
385            (&Value::Enum(i, ref s), SchemaPiece::Enum { symbols, .. }) => {
386                symbols.get(i).map(|symbol| symbol == s).unwrap_or(false)
387            }
388            (
389                &Value::Union {
390                    index,
391                    ref inner,
392                    n_variants,
393                    null_variant,
394                },
395                SchemaPiece::Union(schema_inner),
396            ) => {
397                schema_inner.variants().len() > index
398                    && n_variants == schema_inner.variants().len()
399                    && inner.validate(schema.step(&schema_inner.variants()[index]))
400                    && match null_variant {
401                        None => !schema_inner
402                            .variants()
403                            .iter()
404                            .any(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
405                        Some(null_variant_idx) => {
406                            schema_inner.variants().get(null_variant_idx)
407                                == Some(&SchemaPieceOrNamed::Piece(SchemaPiece::Null))
408                        }
409                    }
410            }
411            (&Value::Array(ref items), SchemaPiece::Array(inner)) => {
412                let node = schema.step(&**inner);
413                items.iter().all(|item| item.validate(node))
414            }
415            (&Value::Map(ref items), SchemaPiece::Map(inner)) => {
416                let node = schema.step(&**inner);
417                items.iter().all(|(_, value)| value.validate(node))
418            }
419            (&Value::Record(ref record_fields), SchemaPiece::Record { fields, .. }) => {
420                fields.len() == record_fields.len()
421                    && fields.iter().zip_eq(record_fields.iter()).all(
422                        |(field, &(ref name, ref value))| {
423                            let node = schema.step(&field.schema);
424                            field.name == *name && value.validate(node)
425                        },
426                    )
427            }
428            (Value::Json(_), SchemaPiece::Json) => true,
429            (Value::Uuid(_), SchemaPiece::Uuid) => true,
430            _ => false,
431        }
432    }
433
434    pub fn into_string(self) -> Option<String> {
436        match self {
437            Value::String(s) => Some(s),
438            _ => None,
439        }
440    }
441
442    pub fn into_nullable_bool(self) -> Option<bool> {
443        match self {
444            Value::Boolean(b) => Some(b),
445            Value::Union { inner, .. } => inner.into_nullable_bool(),
446            _ => None,
447        }
448    }
449
450    pub fn into_integral(self) -> Option<i64> {
451        match self {
452            Value::Int(i) => Some(i as i64),
453            Value::Long(l) => Some(l),
454            _ => None,
455        }
456    }
457
458    pub fn into_usize(self) -> Option<usize> {
459        self.into_integral().and_then(|i| i.try_into().ok())
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use std::str::FromStr;
466
467    use crate::Schema;
468
469    use super::*;
470
471    #[mz_ore::test]
472    fn validate() {
473        let value_schema_valid = vec![
474            (Value::Int(42), "\"int\"", true),
475            (Value::Int(42), "\"boolean\"", false),
476            (
477                Value::Union {
478                    index: 0,
479                    inner: Box::new(Value::Null),
480                    n_variants: 2,
481                    null_variant: Some(0),
482                },
483                r#"["null", "int"]"#,
484                true,
485            ),
486            (
487                Value::Union {
488                    index: 1,
489                    inner: Box::new(Value::Int(42)),
490                    n_variants: 2,
491                    null_variant: Some(0),
492                },
493                r#"["null", "int"]"#,
494                true,
495            ),
496            (
497                Value::Union {
498                    index: 1,
499                    inner: Box::new(Value::Null),
500                    n_variants: 2,
501                    null_variant: Some(1),
502                },
503                r#"["double", "int"]"#,
504                false,
505            ),
506            (
507                Value::Union {
508                    index: 3,
509                    inner: Box::new(Value::Int(42)),
510                    n_variants: 4,
511                    null_variant: Some(0),
512                },
513                r#"["null", "double", "string", "int"]"#,
514                true,
515            ),
516            (
517                Value::Array(vec![Value::Long(42i64)]),
518                r#"{"type": "array", "items": "long"}"#,
519                true,
520            ),
521            (
522                Value::Array(vec![Value::Boolean(true)]),
523                r#"{"type": "array", "items": "long"}"#,
524                false,
525            ),
526            (Value::Record(vec![]), "\"null\"", false),
527        ];
528
529        for (value, schema_str, valid) in value_schema_valid.into_iter() {
530            let schema = Schema::from_str(schema_str).unwrap();
531            assert_eq!(
532                valid,
533                value.validate(schema.top_node()),
534                "Schema failed to validate against value: {} {:#?}",
535                schema_str,
536                value
537            );
538        }
539    }
540
541    #[mz_ore::test]
542    fn validate_fixed() {
543        let schema =
544            Schema::from_str(r#"{"type": "fixed", "size": 4, "name": "some_fixed"}"#).unwrap();
545
546        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(schema.top_node()));
547        assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(schema.top_node()));
548    }
549
550    #[mz_ore::test]
551    fn validate_enum() {
552        let schema = Schema::from_str(r#"{"type": "enum", "name": "some_enum", "symbols": ["spades", "hearts", "diamonds", "clubs"]}"#).unwrap();
553
554        assert!(Value::Enum(0, "spades".to_string()).validate(schema.top_node()));
555        assert!(Value::String("spades".to_string()).validate(schema.top_node()));
556
557        assert!(!Value::Enum(1, "spades".to_string()).validate(schema.top_node()));
558        assert!(!Value::String("lorem".to_string()).validate(schema.top_node()));
559
560        let other_schema = Schema::from_str(r#"{"type": "enum", "name": "some_other_enum", "symbols": ["hearts", "diamonds", "clubs", "spades"]}"#).unwrap();
561
562        assert!(!Value::Enum(0, "spades".to_string()).validate(other_schema.top_node()));
563    }
564
565    #[mz_ore::test]
566    fn validate_record() {
567        let schema = Schema::from_str(
568            r#"{
569           "type": "record",
570           "fields": [
571             {"type": "long", "name": "a"},
572             {"type": "string", "name": "b"}
573           ],
574           "name": "some_record"
575        }"#,
576        )
577        .unwrap();
578
579        assert!(
580            Value::Record(vec![
581                ("a".to_string(), Value::Long(42i64)),
582                ("b".to_string(), Value::String("foo".to_string())),
583            ])
584            .validate(schema.top_node())
585        );
586
587        assert!(
588            !Value::Record(vec![
589                ("b".to_string(), Value::String("foo".to_string())),
590                ("a".to_string(), Value::Long(42i64)),
591            ])
592            .validate(schema.top_node())
593        );
594
595        assert!(
596            !Value::Record(vec![
597                ("a".to_string(), Value::Boolean(false)),
598                ("b".to_string(), Value::String("foo".to_string())),
599            ])
600            .validate(schema.top_node())
601        );
602
603        assert!(
604            !Value::Record(vec![
605                ("a".to_string(), Value::Long(42i64)),
606                ("c".to_string(), Value::String("foo".to_string())),
607            ])
608            .validate(schema.top_node())
609        );
610
611        assert!(
612            !Value::Record(vec![
613                ("a".to_string(), Value::Long(42i64)),
614                ("b".to_string(), Value::String("foo".to_string())),
615                ("c".to_string(), Value::Null),
616            ])
617            .validate(schema.top_node())
618        );
619    }
620
621    #[mz_ore::test]
622    fn validate_decimal() {
623        assert!(
624            Value::Decimal(DecimalValue {
625                unscaled: vec![7],
626                precision: 12,
627                scale: 5
628            })
629            .validate(
630                Schema::from_str(
631                    r#"
632            {
633                "type": "bytes",
634                "logicalType": "decimal",
635                "precision": 12,
636                "scale": 5
637            }
638        "#
639                )
640                .unwrap()
641                .top_node()
642            )
643        );
644
645        assert!(
646            !Value::Decimal(DecimalValue {
647                unscaled: vec![7],
648                precision: 13,
649                scale: 5
650            })
651            .validate(
652                Schema::from_str(
653                    r#"
654            {
655                "type": "bytes",
656                "logicalType": "decimal",
657                "precision": 12,
658                "scale": 5
659            }
660        "#
661                )
662                .unwrap()
663                .top_node()
664            )
665        );
666    }
667}