mz_avro/lib.rs
1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! # avro
25//! **[Apache Avro](https://avro.apache.org/)** is a data serialization system which provides rich
26//! data structures and a compact, fast, binary data format.
27//!
28//! All data in Avro is schematized, as in the following example:
29//!
30//! ```text
31//! {
32//!     "type": "record",
33//!     "name": "test",
34//!     "fields": [
35//!         {"name": "a", "type": "long", "default": 42},
36//!         {"name": "b", "type": "string"}
37//!     ]
38//! }
39//! ```
40//!
41//! There are basically two ways of handling Avro data in Rust:
42//!
43//! * **as Avro-specialized data types** based on an Avro schema;
44//! * **as generic Rust types** with custom serialization logic implementing `AvroDecode`
45//!   (currently only supports deserialization, not serialization).
46//!
47//! **avro** provides a way to read and write both these data representations easily and
48//! efficiently.
49//!
50//! # Installing the library
51//!
52//!
53//! Add to your `Cargo.toml`:
54//!
55//! ```text
56//! [dependencies]
57//! avro = "x.y"
58//! ```
59//!
60//! Or in case you want to leverage the **Snappy** codec:
61//!
62//! ```text
63//! [dependencies.avro]
64//! version = "x.y"
65//! features = ["snappy"]
66//! ```
67//!
68//! # Defining a schema
69//!
70//! Avro data cannot exist without an Avro schema. Schemas **must** be used both while writing and
71//! reading and they carry the information regarding the type of data we are
72//! handling. Avro schemas are used for both schema validation and resolution of Avro data.
73//!
74//! Avro schemas are defined in **JSON** format and can just be parsed out of a raw string:
75//!
76//! ```
77//! use mz_avro::Schema;
78//!
79//! let raw_schema = r#"
80//!     {
81//!         "type": "record",
82//!         "name": "test",
83//!         "fields": [
84//!             {"name": "a", "type": "long", "default": 42},
85//!             {"name": "b", "type": "string"}
86//!         ]
87//!     }
88//! "#;
89//!
90//! // if the schema is not valid, this function will return an error
91//! let schema: Schema = raw_schema.parse().unwrap();
92//!
93//! // schemas can be printed for debugging
94//! println!("{:?}", schema);
95//! ```
96//!
97//! For more information about schemas and what kind of information you can encapsulate in them,
98//! please refer to the appropriate section of the
99//! [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas).
100//!
101//! # Writing data
102//!
103//! Once we have defined a schema, we are ready to serialize data in Avro, validating them against
104//! the provided schema in the process.
105//!
106//! **NOTE:** The library also provides a low-level interface for encoding a single datum in Avro
107//! bytecode without generating markers and headers (for advanced use), but we highly recommend the
108//! `Writer` interface to be totally Avro-compatible. Please read the API reference in case you are
109//! interested.
110//!
111//! Given that the schema we defined above is that of an Avro *Record*, we are going to use the
112//! associated type provided by the library to specify the data we want to serialize:
113//!
114//! ```
115//! # use mz_avro::Schema;
116//! use mz_avro::types::Record;
117//! use mz_avro::Writer;
118//! #
119//! # let raw_schema = r#"
120//! #     {
121//! #         "type": "record",
122//! #         "name": "test",
123//! #         "fields": [
124//! #             {"name": "a", "type": "long", "default": 42},
125//! #             {"name": "b", "type": "string"}
126//! #         ]
127//! #     }
128//! # "#;
129//! # let schema: Schema = raw_schema.parse().unwrap();
130//! // a writer needs a schema and something to write to
131//! let mut writer = Writer::new(schema.clone(), Vec::new());
132//!
133//! // the Record type models our Record schema
134//! let mut record = Record::new(schema.top_node()).unwrap();
135//! record.put("a", 27i64);
136//! record.put("b", "foo");
137//!
138//! // schema validation happens here
139//! writer.append(record).unwrap();
140//!
141//! // flushing makes sure that all data gets encoded
142//! writer.flush().unwrap();
143//!
144//! // this is how to get back the resulting avro bytecode
145//! let encoded = writer.into_inner();
146//! ```
147//!
148//! The vast majority of the time, schemas tend to define a record as a top-level container
149//! encapsulating all the values to convert as fields and providing documentation for them, but in
150//! case we want to directly define an Avro value, the library offers that capability via the
151//! `Value` interface.
152//!
153//! ```
154//! use mz_avro::types::Value;
155//!
156//! let mut value = Value::String("foo".to_string());
157//! ```
158//!
159//! ## Using codecs to compress data
160//!
161//! Avro supports three different compression codecs when encoding data:
162//!
163//! * **Null**: leaves data uncompressed;
164//! * **Deflate**: writes the data block using the deflate algorithm as specified in RFC 1951, and
165//! typically implemented using the zlib library. Note that this format (unlike the "zlib format" in
166//! RFC 1950) does not have a checksum.
167//! * **Snappy**: uses Google's [Snappy](http://google.github.io/snappy/) compression library. Each
168//! compressed block is followed by the 4-byte, big-endianCRC32 checksum of the uncompressed data in
169//! the block. You must enable the `snappy` feature to use this codec.
170//!
171//! To specify a codec to use to compress data, just specify it while creating a `Writer`:
172//! ```
173//! # use mz_avro::Schema;
174//! use mz_avro::Writer;
175//! use mz_avro::Codec;
176//! #
177//! # let raw_schema = r#"
178//! #     {
179//! #         "type": "record",
180//! #         "name": "test",
181//! #         "fields": [
182//! #             {"name": "a", "type": "long", "default": 42},
183//! #             {"name": "b", "type": "string"}
184//! #         ]
185//! #     }
186//! # "#;
187//! # let schema: Schema = raw_schema.parse().unwrap();
188//! let mut writer = Writer::with_codec(schema, Vec::new(), Codec::Deflate);
189//! ```
190//!
191//! # Reading data
192//!
193//! As far as reading Avro encoded data goes, we can just use the schema encoded with the data to
194//! read them. The library will do it automatically for us, as it already does for the compression
195//! codec:
196//!
197//! ```
198//!
199//! use mz_avro::Reader;
200//! # use mz_avro::Schema;
201//! # use mz_avro::types::Record;
202//! # use mz_avro::Writer;
203//! #
204//! # let raw_schema = r#"
205//! #     {
206//! #         "type": "record",
207//! #         "name": "test",
208//! #         "fields": [
209//! #             {"name": "a", "type": "long", "default": 42},
210//! #             {"name": "b", "type": "string"}
211//! #         ]
212//! #     }
213//! # "#;
214//! # let schema: Schema = raw_schema.parse().unwrap();
215//! # let mut writer = Writer::new(schema.clone(), Vec::new());
216//! # let mut record = Record::new(schema.top_node()).unwrap();
217//! # record.put("a", 27i64);
218//! # record.put("b", "foo");
219//! # writer.append(record).unwrap();
220//! # writer.flush().unwrap();
221//! # let input = writer.into_inner();
222//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
223//! let reader = Reader::new(&input[..]).unwrap();
224//! ```
225//!
226//! In case, instead, we want to specify a different (but compatible) reader schema from the schema
227//! the data has been written with, we can just do as the following:
228//! ```
229//! use mz_avro::Schema;
230//! use mz_avro::Reader;
231//! # use mz_avro::types::Record;
232//! # use mz_avro::Writer;
233//! #
234//! # let writer_raw_schema = r#"
235//! #     {
236//! #         "type": "record",
237//! #         "name": "test",
238//! #         "fields": [
239//! #             {"name": "a", "type": "long", "default": 42},
240//! #             {"name": "b", "type": "string"}
241//! #         ]
242//! #     }
243//! # "#;
244//! # let writer_schema: Schema = writer_raw_schema.parse().unwrap();
245//! # let mut writer = Writer::new(writer_schema.clone(), Vec::new());
246//! # let mut record = Record::new(writer_schema.top_node()).unwrap();
247//! # record.put("a", 27i64);
248//! # record.put("b", "foo");
249//! # writer.append(record).unwrap();
250//! # writer.flush().unwrap();
251//! # let input = writer.into_inner();
252//!
253//! let reader_raw_schema = r#"
254//!     {
255//!         "type": "record",
256//!         "name": "test",
257//!         "fields": [
258//!             {"name": "a", "type": "long", "default": 42},
259//!             {"name": "b", "type": "string"},
260//!             {"name": "c", "type": "long", "default": 43}
261//!         ]
262//!     }
263//! "#;
264//!
265//! let reader_schema: Schema = reader_raw_schema.parse().unwrap();
266//!
267//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
268//! let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
269//! ```
270//!
271//! The library will also automatically perform schema resolution while reading the data.
272//!
273//! For more information about schema compatibility and resolution, please refer to the
274//! [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas).
275//!
276//! There are two ways to handle deserializing Avro data in Rust, as you can see below.
277//!
278//! **NOTE:** The library also provides a low-level interface for decoding a single datum in Avro
279//! bytecode without markers and header (for advanced use), but we highly recommend the `Reader`
280//! interface to leverage all Avro features. Please read the API reference in case you are
281//! interested.
282//!
283//!
284//! ## The avro way
285//!
286//! We can just read directly instances of `Value` out of the `Reader` iterator:
287//!
288//! ```
289//! # use mz_avro::Schema;
290//! # use mz_avro::types::Record;
291//! # use mz_avro::Writer;
292//! use mz_avro::Reader;
293//! #
294//! # let raw_schema = r#"
295//! #     {
296//! #         "type": "record",
297//! #         "name": "test",
298//! #         "fields": [
299//! #             {"name": "a", "type": "long", "default": 42},
300//! #             {"name": "b", "type": "string"}
301//! #         ]
302//! #     }
303//! # "#;
304//! # let schema: Schema = raw_schema.parse().unwrap();
305//! # let mut writer = Writer::new(schema.clone(), Vec::new());
306//! # let mut record = Record::new(schema.top_node()).unwrap();
307//! # record.put("a", 27i64);
308//! # record.put("b", "foo");
309//! # writer.append(record).unwrap();
310//! # writer.flush().unwrap();
311//! # let input = writer.into_inner();
312//! let mut reader = Reader::new(&input[..]).unwrap();
313//!
314//! // value is a Result of an Avro Value in case the read operation fails
315//! for value in reader {
316//!     println!("{:?}", value.unwrap());
317//! }
318//!
319//! ```
320//!
321//! ## Custom deserialization (advanced)
322//!
323//! It is possible to avoid the intermediate stage of decoding to `Value`,
324//! by implementing `AvroDecode` for one or more structs that will determine how to decode various schema pieces.
325//!
326//! This API is in flux, and more complete documentation is coming soon. For now,
327//! [Materialize](https://github.com/MaterializeInc/materialize/blob/main/src/interchange/src/avro.rs)
328//! furnishes the most complete example.
329
330// TODO(benesch): remove this once this crate no longer makes use of potentially
331// dangerous `as` conversions.
332#![allow(clippy::as_conversions)]
333
334mod codec;
335mod decode;
336mod reader;
337mod util;
338mod writer;
339
340pub mod encode;
341pub mod error;
342pub mod schema;
343pub mod types;
344
345pub use crate::codec::Codec;
346pub use crate::decode::public_decoders::*;
347pub use crate::decode::{
348    AvroArrayAccess, AvroDecodable, AvroDecode, AvroDeserializer, AvroFieldAccess, AvroMapAccess,
349    AvroRead, AvroRecordAccess, GeneralDeserializer, Skip, StatefulAvroDecodable, ValueOrReader,
350    give_value,
351};
352pub use crate::encode::encode as encode_unchecked;
353pub use crate::reader::{Block, BlockIter, Reader, from_avro_datum};
354pub use crate::schema::{ParseSchemaError, Schema};
355pub use crate::types::SchemaResolutionError;
356pub use crate::writer::{ValidationError, Writer, to_avro_datum, write_avro_datum};
357
358#[cfg(test)]
359mod tests {
360    use std::str::FromStr;
361
362    use mz_ore::{assert_err, assert_none};
363
364    use crate::reader::Reader;
365    use crate::schema::Schema;
366    use crate::types::{Record, Value};
367
368    use super::*;
369
370    //TODO: move where it fits better
371    #[mz_ore::test]
372    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
373    fn test_enum_default() {
374        let writer_raw_schema = r#"
375            {
376                "type": "record",
377                "name": "test",
378                "fields": [
379                    {"name": "a", "type": "long", "default": 42},
380                    {"name": "b", "type": "string"}
381                ]
382            }
383        "#;
384        let reader_raw_schema = r#"
385            {
386                "type": "record",
387                "name": "test",
388                "fields": [
389                    {"name": "a", "type": "long", "default": 42},
390                    {"name": "b", "type": "string"},
391                    {
392                        "name": "c",
393                        "type": {
394                            "type": "enum",
395                            "name": "suit",
396                            "symbols": ["diamonds", "spades", "clubs", "hearts"]
397                        },
398                        "default": "spades"
399                    }
400                ]
401            }
402        "#;
403        let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
404        let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
405        let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
406        let mut record = Record::new(writer_schema.top_node()).unwrap();
407        record.put("a", 27i64);
408        record.put("b", "foo");
409        writer.append(record).unwrap();
410        writer.flush().unwrap();
411        let input = writer.into_inner();
412        let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
413        assert_eq!(
414            reader.next().unwrap().unwrap(),
415            Value::Record(vec![
416                ("a".to_string(), Value::Long(27)),
417                ("b".to_string(), Value::String("foo".to_string())),
418                ("c".to_string(), Value::Enum(1, "spades".to_string())),
419            ])
420        );
421        assert_none!(reader.next());
422    }
423
424    //TODO: move where it fits better
425    #[mz_ore::test]
426    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
427    fn test_enum_string_value() {
428        let raw_schema = r#"
429            {
430                "type": "record",
431                "name": "test",
432                "fields": [
433                    {"name": "a", "type": "long", "default": 42},
434                    {"name": "b", "type": "string"},
435                    {
436                        "name": "c",
437                        "type": {
438                            "type": "enum",
439                            "name": "suit",
440                            "symbols": ["diamonds", "spades", "clubs", "hearts"]
441                        },
442                        "default": "spades"
443                    }
444                ]
445            }
446        "#;
447        let schema = Schema::from_str(raw_schema).unwrap();
448        let mut writer = Writer::with_codec(schema.clone(), Vec::new(), Codec::Null);
449        let mut record = Record::new(schema.top_node()).unwrap();
450        record.put("a", 27i64);
451        record.put("b", "foo");
452        record.put("c", "clubs");
453        writer.append(record).unwrap();
454        writer.flush().unwrap();
455        let input = writer.into_inner();
456        let mut reader = Reader::with_schema(&schema, &input[..]).unwrap();
457        assert_eq!(
458            reader.next().unwrap().unwrap(),
459            Value::Record(vec![
460                ("a".to_string(), Value::Long(27)),
461                ("b".to_string(), Value::String("foo".to_string())),
462                ("c".to_string(), Value::Enum(2, "clubs".to_string())),
463            ])
464        );
465        assert_none!(reader.next());
466    }
467
468    //TODO: move where it fits better
469    #[mz_ore::test]
470    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
471    fn test_enum_resolution() {
472        let writer_raw_schema = r#"
473            {
474                "type": "record",
475                "name": "test",
476                "fields": [
477                    {"name": "a", "type": "long", "default": 42},
478                    {"name": "b", "type": "string"},
479                    {
480                        "name": "c",
481                        "type": {
482                            "type": "enum",
483                            "name": "suit",
484                            "symbols": ["diamonds", "spades", "clubs", "hearts"]
485                        },
486                        "default": "spades"
487                    }
488                ]
489            }
490        "#;
491        let reader_raw_schema = r#"
492            {
493                "type": "record",
494                "name": "test",
495                "fields": [
496                    {"name": "a", "type": "long", "default": 42},
497                    {"name": "b", "type": "string"},
498                    {
499                        "name": "c",
500                        "type": {
501                            "type": "enum",
502                            "name": "suit",
503                            "symbols": ["diamonds", "spades", "ninja", "hearts"]
504                        },
505                        "default": "spades"
506                    }
507                ]
508            }
509        "#;
510        let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
511        let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
512        let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
513        let mut record = Record::new(writer_schema.top_node()).unwrap();
514        record.put("a", 27i64);
515        record.put("b", "foo");
516        record.put("c", "clubs");
517        writer.append(record).unwrap();
518        writer.flush().unwrap();
519        let input = writer.into_inner();
520        let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
521        assert_err!(reader.next().unwrap());
522        assert_none!(reader.next());
523    }
524
525    //TODO: move where it fits better
526    #[mz_ore::test]
527    fn test_enum_no_reader_schema() {
528        let writer_raw_schema = r#"
529            {
530                "type": "record",
531                "name": "test",
532                "fields": [
533                    {"name": "a", "type": "long", "default": 42},
534                    {"name": "b", "type": "string"},
535                    {
536                        "name": "c",
537                        "type": {
538                            "type": "enum",
539                            "name": "suit",
540                            "symbols": ["diamonds", "spades", "clubs", "hearts"]
541                        },
542                        "default": "spades"
543                    }
544                ]
545            }
546        "#;
547        let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
548        let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
549        let mut record = Record::new(writer_schema.top_node()).unwrap();
550        record.put("a", 27i64);
551        record.put("b", "foo");
552        record.put("c", "clubs");
553        writer.append(record).unwrap();
554        writer.flush().unwrap();
555        let input = writer.into_inner();
556        let mut reader = Reader::new(&input[..]).unwrap();
557        assert_eq!(
558            reader.next().unwrap().unwrap(),
559            Value::Record(vec![
560                ("a".to_string(), Value::Long(27)),
561                ("b".to_string(), Value::String("foo".to_string())),
562                ("c".to_string(), Value::Enum(2, "clubs".to_string())),
563            ])
564        );
565    }
566    #[mz_ore::test]
567    fn test_datetime_value() {
568        let writer_raw_schema = r#"{
569        "type": "record",
570        "name": "dttest",
571        "fields": [
572            {
573                "name": "a",
574                "type": {
575                    "type": "long",
576                    "logicalType": "timestamp-micros"
577                }
578            }
579        ]}"#;
580        let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
581        let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
582        let mut record = Record::new(writer_schema.top_node()).unwrap();
583        let dt = chrono::DateTime::from_timestamp(1_000, 995_000_000)
584            .unwrap()
585            .naive_utc();
586        record.put("a", types::Value::Timestamp(dt));
587        writer.append(record).unwrap();
588        writer.flush().unwrap();
589        let input = writer.into_inner();
590        let mut reader = Reader::new(&input[..]).unwrap();
591        assert_eq!(
592            reader.next().unwrap().unwrap(),
593            Value::Record(vec![("a".to_string(), Value::Timestamp(dt)),])
594        );
595    }
596
597    #[mz_ore::test]
598    fn test_malformed_length() {
599        let raw_schema = r#"
600            {
601                "type": "record",
602                "name": "test",
603                "fields": [
604                    {"name": "a", "type": "long", "default": 42},
605                    {"name": "b", "type": "string"}
606                ]
607            }
608        "#;
609
610        let schema = Schema::from_str(raw_schema).unwrap();
611
612        // Would allocated 18446744073709551605 bytes
613        let malformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff];
614
615        let value = from_avro_datum(&schema, &mut &malformed[..]);
616        assert_err!(value);
617    }
618}