mz_avro/
types.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic handling the intermediate representation of Avro values.
25
26use std::collections::BTreeMap;
27use std::fmt;
28
29use chrono::NaiveDateTime;
30use enum_kinds::EnumKind;
31use itertools::Itertools;
32use serde_json::Value as JsonValue;
33
34use crate::schema::{RecordField, SchemaNode, SchemaPiece, SchemaPieceOrNamed};
35
36/// Describes errors happened while performing schema resolution on Avro data.
37#[derive(Clone, Debug, Eq, PartialEq)]
38pub struct SchemaResolutionError(pub String);
39
40impl SchemaResolutionError {
41    pub fn new<S>(msg: S) -> SchemaResolutionError
42    where
43        S: Into<String>,
44    {
45        SchemaResolutionError(msg.into())
46    }
47}
48
49impl fmt::Display for SchemaResolutionError {
50    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
51        self.0.fmt(f)
52    }
53}
54
55impl std::error::Error for SchemaResolutionError {}
56
57#[derive(Clone, Debug, PartialEq)]
58pub struct DecimalValue {
59    /// An unscaled two's-complement integer value in big-endian byte order.
60    pub unscaled: Vec<u8>,
61    pub precision: usize,
62    pub scale: usize,
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, EnumKind)] // Can't be Eq because there are floats
66#[enum_kind(ScalarKind)]
67pub enum Scalar {
68    Null,
69    Boolean(bool),
70    Int(i32),
71    Long(i64),
72    Float(f32),
73    Double(f64),
74    Date(i32),
75    Timestamp(NaiveDateTime),
76}
77
78impl From<Scalar> for Value {
79    fn from(s: Scalar) -> Self {
80        match s {
81            Scalar::Null => Value::Null,
82            Scalar::Boolean(v) => Value::Boolean(v),
83            Scalar::Int(v) => Value::Int(v),
84            Scalar::Long(v) => Value::Long(v),
85            Scalar::Float(v) => Value::Float(v),
86            Scalar::Double(v) => Value::Double(v),
87            Scalar::Date(v) => Value::Date(v),
88            Scalar::Timestamp(v) => Value::Timestamp(v),
89        }
90    }
91}
92
93/// Represents any valid Avro value
94/// More information about Avro values can be found in the
95/// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
96#[derive(Clone, Debug, PartialEq)]
97pub enum Value {
98    // Fixed-length types
99    /// A `null` Avro value.
100    Null,
101    /// A `boolean` Avro value.
102    Boolean(bool),
103    /// A `int` Avro value.
104    Int(i32),
105    /// A `long` Avro value.
106    Long(i64),
107    /// A `float` Avro value.
108    Float(f32),
109    /// A `double` Avro value.
110    Double(f64),
111    /// A `Date` coming from an avro Logical `Date`, which is an i32 number of
112    /// days since the Unix epoch.
113    Date(i32),
114    /// A `DateTime` coming from an avro Logical `Timestamp`
115    Timestamp(NaiveDateTime),
116
117    // Variable-length types
118    /// A `decimal` Avro value
119    ///
120    /// The value of the decimal can be computed as follows:
121    /// <em>unscaled</em> × 10<sup>-<em>scale</em></sup>.
122    Decimal(DecimalValue),
123    /// A `bytes` Avro value.
124    Bytes(Vec<u8>),
125    /// A `string` Avro value.
126    String(String),
127    /// A `fixed` Avro value.
128    /// The size of the fixed value is represented as a `usize`.
129    Fixed(usize, Vec<u8>),
130    /// An `enum` Avro value.
131    ///
132    /// An Enum is represented by a symbol and its position in the symbols list
133    /// of its corresponding schema.
134    /// This allows schema-less encoding, as well as schema resolution while
135    /// reading values.
136    Enum(usize, String),
137    /// An `union` Avro value.
138    Union {
139        /// The index of this variant in the reader schema
140        index: usize,
141        /// The value of the variant
142        inner: Box<Value>,
143        // The next two metadata fields are necessary for the Materialize "flattened unions" decoding strategy to work properly.
144        /// The number of variants in the reader schema
145        n_variants: usize,
146        /// Which variant is null in the reader schema.
147        null_variant: Option<usize>,
148    },
149    /// An `array` Avro value.
150    Array(Vec<Value>),
151    /// A `map` Avro value.
152    Map(BTreeMap<String, Value>),
153    /// A `record` Avro value.
154    ///
155    /// A Record is represented by a vector of (`<field name>`, `value`).
156    /// This allows schema-less encoding.
157    ///
158    /// See [Record](types.Record) for a more user-friendly support.
159    Record(Vec<(String, Value)>),
160    /// A `string` Avro value that has been interpreted as JSON.
161    ///
162    /// This is not part of the Avro spec, but is emitted by Debezium,
163    /// and distinguished by setting the `"connect.name"` property to `"io.debezium.data.Json"`.
164    Json(serde_json::Value),
165    /// A `Uuid` coming from an avro Logical `uuid`.
166    Uuid(uuid::Uuid),
167}
168
169/// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable
170/// from a [Writer](../writer/struct.Writer.html).
171pub trait ToAvro {
172    /// Transforms this value into an Avro-compatible [`Value`].
173    fn avro(self) -> Value;
174}
175
176macro_rules! to_avro(
177    ($t:ty, $v:expr) => (
178        impl ToAvro for $t {
179            fn avro(self) -> Value {
180                $v(self)
181            }
182        }
183    );
184);
185
186to_avro!(bool, Value::Boolean);
187to_avro!(i32, Value::Int);
188to_avro!(i64, Value::Long);
189to_avro!(f32, Value::Float);
190to_avro!(f64, Value::Double);
191to_avro!(String, Value::String);
192
193impl ToAvro for () {
194    fn avro(self) -> Value {
195        Value::Null
196    }
197}
198
199impl ToAvro for usize {
200    fn avro(self) -> Value {
201        (self as i64).avro()
202    }
203}
204
205impl<'a> ToAvro for &'a str {
206    fn avro(self) -> Value {
207        Value::String(self.to_owned())
208    }
209}
210
211impl<'a> ToAvro for &'a [u8] {
212    fn avro(self) -> Value {
213        Value::Bytes(self.to_owned())
214    }
215}
216
217impl<T> ToAvro for BTreeMap<String, T>
218where
219    T: ToAvro,
220{
221    fn avro(self) -> Value {
222        Value::Map(
223            self.into_iter()
224                .map(|(key, value)| (key, value.avro()))
225                .collect::<_>(),
226        )
227    }
228}
229
230impl<'a, T> ToAvro for BTreeMap<&'a str, T>
231where
232    T: ToAvro,
233{
234    fn avro(self) -> Value {
235        Value::Map(
236            self.into_iter()
237                .map(|(key, value)| (key.to_owned(), value.avro()))
238                .collect::<_>(),
239        )
240    }
241}
242
243impl ToAvro for Value {
244    fn avro(self) -> Value {
245        self
246    }
247}
248
249/*
250impl<S: Serialize> ToAvro for S {
251    fn avro(self) -> Value {
252        use ser::Serializer;
253
254        self.serialize(&mut Serializer::new()).unwrap()
255    }
256}
257*/
258
259impl Default for Value {
260    fn default() -> Self {
261        Value::Null
262    }
263}
264
265/// Utility interface to build `Value::Record` objects.
266#[derive(Debug, Clone)]
267pub struct Record<'a> {
268    /// List of fields contained in the record.
269    /// Ordered according to the fields in the schema given to create this
270    /// `Record` object. Any unset field defaults to `Value::Null`.
271    pub fields: Vec<(String, Value)>,
272    schema_lookup: &'a BTreeMap<String, usize>,
273    schema_fields: &'a Vec<RecordField>,
274}
275
276impl<'a> Record<'a> {
277    /// Create a `Record` given a `SchemaNode`.
278    ///
279    /// If the `SchemaNode` is not a `SchemaPiece::Record` variant, `None` will be returned.
280    pub fn new(schema: SchemaNode<'a>) -> Option<Record<'a>> {
281        let ret = match schema.inner {
282            SchemaPiece::Record {
283                fields: schema_fields,
284                lookup: schema_lookup,
285                ..
286            } => {
287                let mut fields = Vec::with_capacity(schema_fields.len());
288                for schema_field in schema_fields.iter() {
289                    fields.push((schema_field.name.clone(), Value::Null));
290                }
291
292                Some(Record {
293                    fields,
294                    schema_lookup,
295                    schema_fields,
296                })
297            }
298            _ => None,
299        };
300        ret
301    }
302
303    /// Put a compatible value (implementing the `ToAvro` trait) in the
304    /// `Record` for a given `field` name.
305    ///
306    /// **NOTE** Only ensure that the field name is present in the `SchemaNode` given when creating
307    /// this `Record`. Does not perform any schema validation.
308    pub fn put<V>(&mut self, field: &str, value: V)
309    where
310        V: ToAvro,
311    {
312        if let Some(&position) = self.schema_lookup.get(field) {
313            self.fields[position].1 = value.avro()
314        }
315    }
316
317    /// Get the field description corresponding to the given name.
318    pub fn field_by_name(&self, name: &str) -> Option<&'a RecordField> {
319        self.schema_lookup
320            .get(name)
321            .map(|idx| &self.schema_fields[*idx])
322    }
323}
324
325impl<'a> ToAvro for Record<'a> {
326    fn avro(self) -> Value {
327        Value::Record(self.fields)
328    }
329}
330
331impl ToAvro for JsonValue {
332    fn avro(self) -> Value {
333        match self {
334            JsonValue::Null => Value::Null,
335            JsonValue::Bool(b) => Value::Boolean(b),
336            JsonValue::Number(ref n) if n.is_i64() => Value::Long(n.as_i64().unwrap()),
337            JsonValue::Number(ref n) if n.is_f64() => Value::Double(n.as_f64().unwrap()),
338            JsonValue::Number(n) => Value::Long(n.as_u64().unwrap() as i64), // TODO: Not so great
339            JsonValue::String(s) => Value::String(s),
340            JsonValue::Array(items) => {
341                Value::Array(items.into_iter().map(|item| item.avro()).collect::<_>())
342            }
343            JsonValue::Object(items) => Value::Map(
344                items
345                    .into_iter()
346                    .map(|(key, value)| (key, value.avro()))
347                    .collect::<_>(),
348            ),
349        }
350    }
351}
352
353impl Value {
354    /// Validate the value against the given [Schema](../schema/enum.Schema.html).
355    ///
356    /// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
357    /// for the full set of rules of schema validation.
358    pub fn validate(&self, schema: SchemaNode) -> bool {
359        match (self, schema.inner) {
360            (&Value::Null, SchemaPiece::Null) => true,
361            (&Value::Boolean(_), SchemaPiece::Boolean) => true,
362            (&Value::Int(_), SchemaPiece::Int) => true,
363            (&Value::Long(_), SchemaPiece::Long) => true,
364            (&Value::Float(_), SchemaPiece::Float) => true,
365            (&Value::Double(_), SchemaPiece::Double) => true,
366            (&Value::Date(_), SchemaPiece::Date) => true,
367            (&Value::Timestamp(_), SchemaPiece::TimestampMicro) => true,
368            (&Value::Timestamp(_), SchemaPiece::TimestampMilli) => true,
369            (
370                &Value::Decimal(DecimalValue {
371                    precision: vp,
372                    scale: vs,
373                    ..
374                }),
375                SchemaPiece::Decimal {
376                    precision: sp,
377                    scale: ss,
378                    fixed_size: _,
379                },
380            ) => vp == *sp && vs == *ss,
381            (&Value::Bytes(_), SchemaPiece::Bytes) => true,
382            (&Value::String(_), SchemaPiece::String) => true,
383            (&Value::Fixed(n, _), SchemaPiece::Fixed { size }) => n == *size,
384            (&Value::String(ref s), SchemaPiece::Enum { symbols, .. }) => symbols.contains(s),
385            (&Value::Enum(i, ref s), SchemaPiece::Enum { symbols, .. }) => {
386                symbols.get(i).map(|symbol| symbol == s).unwrap_or(false)
387            }
388            (
389                &Value::Union {
390                    index,
391                    ref inner,
392                    n_variants,
393                    null_variant,
394                },
395                SchemaPiece::Union(schema_inner),
396            ) => {
397                schema_inner.variants().len() > index
398                    && n_variants == schema_inner.variants().len()
399                    && inner.validate(schema.step(&schema_inner.variants()[index]))
400                    && match null_variant {
401                        None => !schema_inner
402                            .variants()
403                            .iter()
404                            .any(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null)),
405                        Some(null_variant_idx) => {
406                            schema_inner.variants().get(null_variant_idx)
407                                == Some(&SchemaPieceOrNamed::Piece(SchemaPiece::Null))
408                        }
409                    }
410            }
411            (&Value::Array(ref items), SchemaPiece::Array(inner)) => {
412                let node = schema.step(&**inner);
413                items.iter().all(|item| item.validate(node))
414            }
415            (&Value::Map(ref items), SchemaPiece::Map(inner)) => {
416                let node = schema.step(&**inner);
417                items.iter().all(|(_, value)| value.validate(node))
418            }
419            (&Value::Record(ref record_fields), SchemaPiece::Record { fields, .. }) => {
420                fields.len() == record_fields.len()
421                    && fields.iter().zip_eq(record_fields.iter()).all(
422                        |(field, &(ref name, ref value))| {
423                            let node = schema.step(&field.schema);
424                            field.name == *name && value.validate(node)
425                        },
426                    )
427            }
428            (Value::Json(_), SchemaPiece::Json) => true,
429            (Value::Uuid(_), SchemaPiece::Uuid) => true,
430            _ => false,
431        }
432    }
433
434    // TODO - `into_` functions for all possible Value variants (perhaps generate this using a macro?)
435    pub fn into_string(self) -> Option<String> {
436        match self {
437            Value::String(s) => Some(s),
438            _ => None,
439        }
440    }
441
442    pub fn into_nullable_bool(self) -> Option<bool> {
443        match self {
444            Value::Boolean(b) => Some(b),
445            Value::Union { inner, .. } => inner.into_nullable_bool(),
446            _ => None,
447        }
448    }
449
450    pub fn into_integral(self) -> Option<i64> {
451        match self {
452            Value::Int(i) => Some(i as i64),
453            Value::Long(l) => Some(l),
454            _ => None,
455        }
456    }
457
458    pub fn into_usize(self) -> Option<usize> {
459        self.into_integral().and_then(|i| i.try_into().ok())
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use std::str::FromStr;
466
467    use crate::Schema;
468
469    use super::*;
470
471    #[mz_ore::test]
472    fn validate() {
473        let value_schema_valid = vec![
474            (Value::Int(42), "\"int\"", true),
475            (Value::Int(42), "\"boolean\"", false),
476            (
477                Value::Union {
478                    index: 0,
479                    inner: Box::new(Value::Null),
480                    n_variants: 2,
481                    null_variant: Some(0),
482                },
483                r#"["null", "int"]"#,
484                true,
485            ),
486            (
487                Value::Union {
488                    index: 1,
489                    inner: Box::new(Value::Int(42)),
490                    n_variants: 2,
491                    null_variant: Some(0),
492                },
493                r#"["null", "int"]"#,
494                true,
495            ),
496            (
497                Value::Union {
498                    index: 1,
499                    inner: Box::new(Value::Null),
500                    n_variants: 2,
501                    null_variant: Some(1),
502                },
503                r#"["double", "int"]"#,
504                false,
505            ),
506            (
507                Value::Union {
508                    index: 3,
509                    inner: Box::new(Value::Int(42)),
510                    n_variants: 4,
511                    null_variant: Some(0),
512                },
513                r#"["null", "double", "string", "int"]"#,
514                true,
515            ),
516            (
517                Value::Array(vec![Value::Long(42i64)]),
518                r#"{"type": "array", "items": "long"}"#,
519                true,
520            ),
521            (
522                Value::Array(vec![Value::Boolean(true)]),
523                r#"{"type": "array", "items": "long"}"#,
524                false,
525            ),
526            (Value::Record(vec![]), "\"null\"", false),
527        ];
528
529        for (value, schema_str, valid) in value_schema_valid.into_iter() {
530            let schema = Schema::from_str(schema_str).unwrap();
531            assert_eq!(
532                valid,
533                value.validate(schema.top_node()),
534                "Schema failed to validate against value: {} {:#?}",
535                schema_str,
536                value
537            );
538        }
539    }
540
541    #[mz_ore::test]
542    fn validate_fixed() {
543        let schema =
544            Schema::from_str(r#"{"type": "fixed", "size": 4, "name": "some_fixed"}"#).unwrap();
545
546        assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(schema.top_node()));
547        assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(schema.top_node()));
548    }
549
550    #[mz_ore::test]
551    fn validate_enum() {
552        let schema = Schema::from_str(r#"{"type": "enum", "name": "some_enum", "symbols": ["spades", "hearts", "diamonds", "clubs"]}"#).unwrap();
553
554        assert!(Value::Enum(0, "spades".to_string()).validate(schema.top_node()));
555        assert!(Value::String("spades".to_string()).validate(schema.top_node()));
556
557        assert!(!Value::Enum(1, "spades".to_string()).validate(schema.top_node()));
558        assert!(!Value::String("lorem".to_string()).validate(schema.top_node()));
559
560        let other_schema = Schema::from_str(r#"{"type": "enum", "name": "some_other_enum", "symbols": ["hearts", "diamonds", "clubs", "spades"]}"#).unwrap();
561
562        assert!(!Value::Enum(0, "spades".to_string()).validate(other_schema.top_node()));
563    }
564
565    #[mz_ore::test]
566    fn validate_record() {
567        let schema = Schema::from_str(
568            r#"{
569           "type": "record",
570           "fields": [
571             {"type": "long", "name": "a"},
572             {"type": "string", "name": "b"}
573           ],
574           "name": "some_record"
575        }"#,
576        )
577        .unwrap();
578
579        assert!(
580            Value::Record(vec![
581                ("a".to_string(), Value::Long(42i64)),
582                ("b".to_string(), Value::String("foo".to_string())),
583            ])
584            .validate(schema.top_node())
585        );
586
587        assert!(
588            !Value::Record(vec![
589                ("b".to_string(), Value::String("foo".to_string())),
590                ("a".to_string(), Value::Long(42i64)),
591            ])
592            .validate(schema.top_node())
593        );
594
595        assert!(
596            !Value::Record(vec![
597                ("a".to_string(), Value::Boolean(false)),
598                ("b".to_string(), Value::String("foo".to_string())),
599            ])
600            .validate(schema.top_node())
601        );
602
603        assert!(
604            !Value::Record(vec![
605                ("a".to_string(), Value::Long(42i64)),
606                ("c".to_string(), Value::String("foo".to_string())),
607            ])
608            .validate(schema.top_node())
609        );
610
611        assert!(
612            !Value::Record(vec![
613                ("a".to_string(), Value::Long(42i64)),
614                ("b".to_string(), Value::String("foo".to_string())),
615                ("c".to_string(), Value::Null),
616            ])
617            .validate(schema.top_node())
618        );
619    }
620
621    #[mz_ore::test]
622    fn validate_decimal() {
623        assert!(
624            Value::Decimal(DecimalValue {
625                unscaled: vec![7],
626                precision: 12,
627                scale: 5
628            })
629            .validate(
630                Schema::from_str(
631                    r#"
632            {
633                "type": "bytes",
634                "logicalType": "decimal",
635                "precision": 12,
636                "scale": 5
637            }
638        "#
639                )
640                .unwrap()
641                .top_node()
642            )
643        );
644
645        assert!(
646            !Value::Decimal(DecimalValue {
647                unscaled: vec![7],
648                precision: 13,
649                scale: 5
650            })
651            .validate(
652                Schema::from_str(
653                    r#"
654            {
655                "type": "bytes",
656                "logicalType": "decimal",
657                "precision": 12,
658                "scale": 5
659            }
660        "#
661                )
662                .unwrap()
663                .top_node()
664            )
665        );
666    }
667}