mz_avro/
types.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic handling the intermediate representation of Avro values.
25
26use std::collections::BTreeMap;
27use std::fmt;
28
29use chrono::NaiveDateTime;
30use enum_kinds::EnumKind;
31use serde_json::Value as JsonValue;
32
33use crate::schema::{RecordField, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
34
35/// Describes errors happened while performing schema resolution on Avro data.
36#[derive(Clone, Debug, Eq, PartialEq)]
37pub struct SchemaResolutionError(pub String);
38
39impl SchemaResolutionError {
40    pub fn new<S>(msg: S) -> SchemaResolutionError
41    where
42        S: Into<String>,
43    {
44        SchemaResolutionError(msg.into())
45    }
46}
47
48impl fmt::Display for SchemaResolutionError {
49    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50        self.0.fmt(f)
51    }
52}
53
54impl std::error::Error for SchemaResolutionError {}
55
56#[derive(Clone, Debug, PartialEq)]
57pub struct DecimalValue {
58    /// An unscaled two's-complement integer value in big-endian byte order.
59    pub unscaled: Vec<u8>,
60    pub precision: usize,
61    pub scale: usize,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, EnumKind)] // Can't be Eq because there are floats
65#[enum_kind(ScalarKind)]
66pub enum Scalar {
67    Null,
68    Boolean(bool),
69    Int(i32),
70    Long(i64),
71    Float(f32),
72    Double(f64),
73    Date(i32),
74    Timestamp(NaiveDateTime),
75}
76
77impl From<Scalar> for Value {
78    fn from(s: Scalar) -> Self {
79        match s {
80            Scalar::Null => Value::Null,
81            Scalar::Boolean(v) => Value::Boolean(v),
82            Scalar::Int(v) => Value::Int(v),
83            Scalar::Long(v) => Value::Long(v),
84            Scalar::Float(v) => Value::Float(v),
85            Scalar::Double(v) => Value::Double(v),
86            Scalar::Date(v) => Value::Date(v),
87            Scalar::Timestamp(v) => Value::Timestamp(v),
88        }
89    }
90}
91
92/// Represents any valid Avro value
93/// More information about Avro values can be found in the
94/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
95#[derive(Clone, Debug, PartialEq)]
96pub enum Value {
97    // Fixed-length types
98    /// A `null` Avro value.
99    Null,
100    /// A `boolean` Avro value.
101    Boolean(bool),
102    /// A `int` Avro value.
103    Int(i32),
104    /// A `long` Avro value.
105    Long(i64),
106    /// A `float` Avro value.
107    Float(f32),
108    /// A `double` Avro value.
109    Double(f64),
110    /// A `Date` coming from an avro Logical `Date`, which is an i32 number of
111    /// days since the Unix epoch.
112    Date(i32),
113    /// A `DateTime` coming from an avro Logical `Timestamp`
114    Timestamp(NaiveDateTime),
115
116    // Variable-length types
117    /// A `decimal` Avro value
118    ///
119    /// The value of the decimal can be computed as follows:
120    /// <em>unscaled</em> × 10<sup>-<em>scale</em></sup>.
121    Decimal(DecimalValue),
122    /// A `bytes` Avro value.
123    Bytes(Vec<u8>),
124    /// A `string` Avro value.
125    String(String),
126    /// A `fixed` Avro value.
127    /// The size of the fixed value is represented as a `usize`.
128    Fixed(usize, Vec<u8>),
129    /// An `enum` Avro value.
130    ///
131    /// An Enum is represented by a symbol and its position in the symbols list
132    /// of its corresponding schema.
133    /// This allows schema-less encoding, as well as schema resolution while
134    /// reading values.
135    Enum(usize, String),
136    /// An `union` Avro value.
137    Union {
138        /// The index of this variant in the reader schema
139        index: usize,
140        /// The value of the variant
141        inner: Box<Value>,
142        // The next two metadata fields are necessary for the Materialize "flattened unions" decoding strategy to work properly.
143        /// The number of variants in the reader schema
144        n_variants: usize,
145        /// Which variant is null in the reader schema.
146        null_variant: Option<usize>,
147    },
148    /// An `array` Avro value.
149    Array(Vec<Value>),
150    /// A `map` Avro value.
151    Map(BTreeMap<String, Value>),
152    /// A `record` Avro value.
153    ///
154    /// A Record is represented by a vector of (`<field name>`, `value`).
155    /// This allows schema-less encoding.
156    ///
157    /// See [Record](types.Record) for a more user-friendly support.
158    Record(Vec<(String, Value)>),
159    /// A `string` Avro value that has been interpreted as JSON.
160    ///
161    /// This is not part of the Avro spec, but is emitted by Debezium,
162    /// and distinguished by setting the `"connect.name"` property to `"io.debezium.data.Json"`.
163    Json(serde_json::Value),
164    /// A `Uuid` coming from an avro Logical `uuid`.
165    Uuid(uuid::Uuid),
166}
167
168/// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable
169/// from a [Writer](../writer/struct.Writer.html).
170pub trait ToAvro {
171    /// Transforms this value into an Avro-compatible [`Value`].
172    fn avro(self) -> Value;
173}
174
175macro_rules! to_avro(
176    ($t:ty, $v:expr) => (
177        impl ToAvro for $t {
178            fn avro(self) -> Value {
179                $v(self)
180            }
181        }
182    );
183);
184
185to_avro!(bool, Value::Boolean);
186to_avro!(i32, Value::Int);
187to_avro!(i64, Value::Long);
188to_avro!(f32, Value::Float);
189to_avro!(f64, Value::Double);
190to_avro!(String, Value::String);
191
192impl ToAvro for () {
193    fn avro(self) -> Value {
194        Value::Null
195    }
196}
197
198impl ToAvro for usize {
199    fn avro(self) -> Value {
200        (self as i64).avro()
201    }
202}
203
204impl<'a> ToAvro for &'a str {
205    fn avro(self) -> Value {
206        Value::String(self.to_owned())
207    }
208}
209
210impl<'a> ToAvro for &'a [u8] {
211    fn avro(self) -> Value {
212        Value::Bytes(self.to_owned())
213    }
214}
215
216impl<T> ToAvro for BTreeMap<String, T>
217where
218    T: ToAvro,
219{
220    fn avro(self) -> Value {
221        Value::Map(
222            self.into_iter()
223                .map(|(key, value)| (key, value.avro()))
224                .collect::<_>(),
225        )
226    }
227}
228
229impl<'a, T> ToAvro for BTreeMap<&'a str, T>
230where
231    T: ToAvro,
232{
233    fn avro(self) -> Value {
234        Value::Map(
235            self.into_iter()
236                .map(|(key, value)| (key.to_owned(), value.avro()))
237                .collect::<_>(),
238        )
239    }
240}
241
242impl ToAvro for Value {
243    fn avro(self) -> Value {
244        self
245    }
246}
247
248/*
249impl<S: Serialize> ToAvro for S {
250    fn avro(self) -> Value {
251        use ser::Serializer;
252
253        self.serialize(&mut Serializer::new()).unwrap()
254    }
255}
256*/
257
258impl Default for Value {
259    fn default() -> Self {
260        Value::Null
261    }
262}
263
264/// Utility interface to build `Value::Record` objects.
265#[derive(Debug, Clone)]
266pub struct Record<'a> {
267    /// List of fields contained in the record.
268    /// Ordered according to the fields in the schema given to create this
269    /// `Record` object. Any unset field defaults to `Value::Null`.
270    pub fields: Vec<(String, Value)>,
271    schema_lookup: &'a BTreeMap<String, usize>,
272    schema_fields: &'a Vec<RecordField>,
273}
274
275impl<'a> Record<'a> {
276    /// Create a `Record` given a `SchemaNode`.
277    ///
278    /// If the `SchemaNode` is not a `SchemaPiece::Record` variant, `None` will be returned.
279    pub fn new(schema: SchemaNode<'a>) -> Option<Record<'a>> {
280        let ret = match schema.inner {
281            SchemaPiece::Record {
282                fields: schema_fields,
283                lookup: schema_lookup,
284                ..
285            } => {
286                let mut fields = Vec::with_capacity(schema_fields.len());
287                for schema_field in schema_fields.iter() {
288                    fields.push((schema_field.name.clone(), Value::Null));
289                }
290
291                Some(Record {
292                    fields,
293                    schema_lookup,
294                    schema_fields,
295                })
296            }
297            _ => None,
298        };
299        ret
300    }
301
302    /// Put a compatible value (implementing the `ToAvro` trait) in the
303    /// `Record` for a given `field` name.
304    ///
305    /// **NOTE** Only ensure that the field name is present in the `SchemaNode` given when creating
306    /// this `Record`. Does not perform any schema validation.
307    pub fn put<V>(&mut self, field: &str, value: V)
308    where
309        V: ToAvro,
310    {
311        if let Some(&position) = self.schema_lookup.get(field) {
312            self.fields[position].1 = value.avro()
313        }
314    }
315
316    /// Get the field description corresponding to the given name.
317    pub fn field_by_name(&self, name: &str) -> Option<&'a RecordField> {
318        self.schema_lookup
319            .get(name)
320            .map(|idx| &self.schema_fields[*idx])
321    }
322}
323
324impl<'a> ToAvro for Record<'a> {
325    fn avro(self) -> Value {
326        Value::Record(self.fields)
327    }
328}
329
330impl ToAvro for JsonValue {
331    fn avro(self) -> Value {
332        match self {
333            JsonValue::Null => Value::Null,
334            JsonValue::Bool(b) => Value::Boolean(b),
335            JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
336            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
337            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
338            JsonValue::String(s) => Value::String(s),
339            JsonValue::Array(items) => {
340                Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
341            }
342            JsonValue::Object(items) => Value::Map(
343                items
344                    .into_iter()
345                    .map(|(key, value)| (key, value.avro()))
346                    .collect::<_>(),
347            ),
348        }
349    }
350}
351
352impl Value {
353    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
354    ///
355    /// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
356    /// for the full set of rules of schema validation.
357    pub fn validate(&self, schema: SchemaNode) -> bool {
358        match (self, schema.inner) {
359            (&Value::Null, SchemaPiece::Null) => true,
360            (&Value::Boolean(_), SchemaPiece::Boolean) => true,
361            (&Value::Int(_), SchemaPiece::Int) => true,
362            (&Value::Long(_), SchemaPiece::Long) => true,
363            (&Value::Float(_), SchemaPiece::Float) => true,
364            (&Value::Double(_), SchemaPiece::Double) => true,
365            (&Value::Date(_), SchemaPiece::Date) => true,
366            (&Value::Timestamp(_), SchemaPiece::TimestampMicro) => true,
367            (&Value::Timestamp(_), SchemaPiece::TimestampMilli) => true,
368            (
369                &Value::Decimal(DecimalValue {
370                    precision: vp,
371                    scale: vs,
372                    ..
373                }),
374                SchemaPiece::Decimal {
375                    precision: sp,
376                    scale: ss,
377                    fixed_size: _,
378                },
379            ) => vp == *sp && vs == *ss,
380            (&Value::Bytes(_), SchemaPiece::Bytes) => true,
381            (&Value::String(_), SchemaPiece::String) => true,
382            (&Value::Fixed(n, _), SchemaPiece::Fixed { size }) => n == *size,
383            (&Value::String(ref s), SchemaPiece::Enum { symbols, .. }) => symbols.contains(s),
384            (&Value::Enum(i, ref s), SchemaPiece::Enum { symbols, .. }) => {
385                symbols.get(i).map(|symbol| symbol == s).unwrap_or(false)
386            }
387            (
388                &Value::Union {
389                    index,
390                    ref inner,
391                    n_variants,
392                    null_variant,
393                },
394                SchemaPiece::Union(schema_inner),
395            ) => {
396                schema_inner.variants().len() > index
397                    && n_variants == schema_inner.variants().len()
398                    && inner.validate(schema.step(&schema_inner.variants()[index]))
399                    && match null_variant {
400                        None => !schema_inner
401                            .variants()
402                            .iter()
403                            .any(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
404                        Some(null_variant_idx) => {
405                            schema_inner.variants().get(null_variant_idx)
406                                == Some(&SchemaPieceOrNamed::Piece(SchemaPiece::Null))
407                        }
408                    }
409            }
410            (&Value::Array(ref items), SchemaPiece::Array(inner)) => {
411                let node = schema.step(&**inner);
412                items.iter().all(|item| item.validate(node))
413            }
414            (&Value::Map(ref items), SchemaPiece::Map(inner)) => {
415                let node = schema.step(&**inner);
416                items.iter().all(|(_, value)| value.validate(node))
417            }
418            (&Value::Record(ref record_fields), SchemaPiece::Record { fields, .. }) => {
419                fields.len() == record_fields.len()
420                    && fields.iter().zip(record_fields.iter()).all(
421                        |(field, &(ref name, ref value))| {
422                            let node = schema.step(&field.schema);
423                            field.name == *name && value.validate(node)
424                        },
425                    )
426            }
427            (Value::Json(_), SchemaPiece::Json) => true,
428            (Value::Uuid(_), SchemaPiece::Uuid) => true,
429            _ => false,
430        }
431    }
432
433    // TODO - `into_` functions for all possible Value variants (perhaps generate this using a macro?)
434    pub fn into_string(self) -> Option<String> {
435        match self {
436            Value::String(s) => Some(s),
437            _ => None,
438        }
439    }
440
441    pub fn into_nullable_bool(self) -> Option<bool> {
442        match self {
443            Value::Boolean(b) => Some(b),
444            Value::Union { inner, .. } => inner.into_nullable_bool(),
445            _ => None,
446        }
447    }
448
449    pub fn into_integral(self) -> Option<i64> {
450        match self {
451            Value::Int(i) => Some(i as i64),
452            Value::Long(l) => Some(l),
453            _ => None,
454        }
455    }
456
457    pub fn into_usize(self) -> Option<usize> {
458        self.into_integral().and_then(|i| i.try_into().ok())
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use std::str::FromStr;
465
466    use crate::Schema;
467
468    use super::*;
469
470    #[mz_ore::test]
471    fn validate() {
472        let value_schema_valid = vec![
473            (Value::Int(42), "\"int\"", true),
474            (Value::Int(42), "\"boolean\"", false),
475            (
476                Value::Union {
477                    index: 0,
478                    inner: Box::new(Value::Null),
479                    n_variants: 2,
480                    null_variant: Some(0),
481                },
482                r#"["null", "int"]"#,
483                true,
484            ),
485            (
486                Value::Union {
487                    index: 1,
488                    inner: Box::new(Value::Int(42)),
489                    n_variants: 2,
490                    null_variant: Some(0),
491                },
492                r#"["null", "int"]"#,
493                true,
494            ),
495            (
496                Value::Union {
497                    index: 1,
498                    inner: Box::new(Value::Null),
499                    n_variants: 2,
500                    null_variant: Some(1),
501                },
502                r#"["double", "int"]"#,
503                false,
504            ),
505            (
506                Value::Union {
507                    index: 3,
508                    inner: Box::new(Value::Int(42)),
509                    n_variants: 4,
510                    null_variant: Some(0),
511                },
512                r#"["null", "double", "string", "int"]"#,
513                true,
514            ),
515            (
516                Value::Array(vec![Value::Long(42i64)]),
517                r#"{"type": "array", "items": "long"}"#,
518                true,
519            ),
520            (
521                Value::Array(vec![Value::Boolean(true)]),
522                r#"{"type": "array", "items": "long"}"#,
523                false,
524            ),
525            (Value::Record(vec![]), "\"null\"", false),
526        ];
527
528        for (value, schema_str, valid) in value_schema_valid.into_iter() {
529            let schema = Schema::from_str(schema_str).unwrap();
530            assert_eq!(
531                valid,
532                value.validate(schema.top_node()),
533                "Schema failed to validate against value: {} {:#?}",
534                schema_str,
535                value
536            );
537        }
538    }
539
540    #[mz_ore::test]
541    fn validate_fixed() {
542        let schema =
543            Schema::from_str(r#"{"type": "fixed", "size": 4, "name": "some_fixed"}"#).unwrap();
544
545        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(schema.top_node()));
546        assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(schema.top_node()));
547    }
548
549    #[mz_ore::test]
550    fn validate_enum() {
551        let schema = Schema::from_str(r#"{"type": "enum", "name": "some_enum", "symbols": ["spades", "hearts", "diamonds", "clubs"]}"#).unwrap();
552
553        assert!(Value::Enum(0, "spades".to_string()).validate(schema.top_node()));
554        assert!(Value::String("spades".to_string()).validate(schema.top_node()));
555
556        assert!(!Value::Enum(1, "spades".to_string()).validate(schema.top_node()));
557        assert!(!Value::String("lorem".to_string()).validate(schema.top_node()));
558
559        let other_schema = Schema::from_str(r#"{"type": "enum", "name": "some_other_enum", "symbols": ["hearts", "diamonds", "clubs", "spades"]}"#).unwrap();
560
561        assert!(!Value::Enum(0, "spades".to_string()).validate(other_schema.top_node()));
562    }
563
564    #[mz_ore::test]
565    fn validate_record() {
566        let schema = Schema::from_str(
567            r#"{
568           "type": "record",
569           "fields": [
570             {"type": "long", "name": "a"},
571             {"type": "string", "name": "b"}
572           ],
573           "name": "some_record"
574        }"#,
575        )
576        .unwrap();
577
578        assert!(
579            Value::Record(vec![
580                ("a".to_string(), Value::Long(42i64)),
581                ("b".to_string(), Value::String("foo".to_string())),
582            ])
583            .validate(schema.top_node())
584        );
585
586        assert!(
587            !Value::Record(vec![
588                ("b".to_string(), Value::String("foo".to_string())),
589                ("a".to_string(), Value::Long(42i64)),
590            ])
591            .validate(schema.top_node())
592        );
593
594        assert!(
595            !Value::Record(vec![
596                ("a".to_string(), Value::Boolean(false)),
597                ("b".to_string(), Value::String("foo".to_string())),
598            ])
599            .validate(schema.top_node())
600        );
601
602        assert!(
603            !Value::Record(vec![
604                ("a".to_string(), Value::Long(42i64)),
605                ("c".to_string(), Value::String("foo".to_string())),
606            ])
607            .validate(schema.top_node())
608        );
609
610        assert!(
611            !Value::Record(vec![
612                ("a".to_string(), Value::Long(42i64)),
613                ("b".to_string(), Value::String("foo".to_string())),
614                ("c".to_string(), Value::Null),
615            ])
616            .validate(schema.top_node())
617        );
618    }
619
620    #[mz_ore::test]
621    fn validate_decimal() {
622        assert!(
623            Value::Decimal(DecimalValue {
624                unscaled: vec![7],
625                precision: 12,
626                scale: 5
627            })
628            .validate(
629                Schema::from_str(
630                    r#"
631            {
632                "type": "bytes",
633                "logicalType": "decimal",
634                "precision": 12,
635                "scale": 5
636            }
637        "#
638                )
639                .unwrap()
640                .top_node()
641            )
642        );
643
644        assert!(
645            !Value::Decimal(DecimalValue {
646                unscaled: vec![7],
647                precision: 13,
648                scale: 5
649            })
650            .validate(
651                Schema::from_str(
652                    r#"
653            {
654                "type": "bytes",
655                "logicalType": "decimal",
656                "precision": 12,
657                "scale": 5
658            }
659        "#
660                )
661                .unwrap()
662                .top_node()
663            )
664        );
665    }
666}