mz_avro/lib.rs
1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! # avro
25//! **[Apache Avro](https://avro.apache.org/)** is a data serialization system which provides rich
26//! data structures and a compact, fast, binary data format.
27//!
28//! All data in Avro is schematized, as in the following example:
29//!
30//! ```text
31//! {
32//! "type": "record",
33//! "name": "test",
34//! "fields": [
35//! {"name": "a", "type": "long", "default": 42},
36//! {"name": "b", "type": "string"}
37//! ]
38//! }
39//! ```
40//!
41//! There are basically two ways of handling Avro data in Rust:
42//!
43//! * **as Avro-specialized data types** based on an Avro schema;
44//! * **as generic Rust types** with custom serialization logic implementing `AvroDecode`
45//! (currently only supports deserialization, not serialization).
46//!
47//! **avro** provides a way to read and write both these data representations easily and
48//! efficiently.
49//!
50//! # Installing the library
51//!
52//!
53//! Add to your `Cargo.toml`:
54//!
55//! ```text
56//! [dependencies]
57//! avro = "x.y"
58//! ```
59//!
60//! Or in case you want to leverage the **Snappy** codec:
61//!
62//! ```text
63//! [dependencies.avro]
64//! version = "x.y"
65//! features = ["snappy"]
66//! ```
67//!
68//! # Defining a schema
69//!
70//! Avro data cannot exist without an Avro schema. Schemas **must** be used both while writing and
71//! reading and they carry the information regarding the type of data we are
72//! handling. Avro schemas are used for both schema validation and resolution of Avro data.
73//!
74//! Avro schemas are defined in **JSON** format and can just be parsed out of a raw string:
75//!
76//! ```
77//! use mz_avro::Schema;
78//!
79//! let raw_schema = r#"
80//! {
81//! "type": "record",
82//! "name": "test",
83//! "fields": [
84//! {"name": "a", "type": "long", "default": 42},
85//! {"name": "b", "type": "string"}
86//! ]
87//! }
88//! "#;
89//!
90//! // if the schema is not valid, this function will return an error
91//! let schema: Schema = raw_schema.parse().unwrap();
92//!
93//! // schemas can be printed for debugging
94//! println!("{:?}", schema);
95//! ```
96//!
97//! For more information about schemas and what kind of information you can encapsulate in them,
98//! please refer to the appropriate section of the
99//! [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas).
100//!
101//! # Writing data
102//!
103//! Once we have defined a schema, we are ready to serialize data in Avro, validating them against
104//! the provided schema in the process.
105//!
106//! **NOTE:** The library also provides a low-level interface for encoding a single datum in Avro
107//! bytecode without generating markers and headers (for advanced use), but we highly recommend the
108//! `Writer` interface to be totally Avro-compatible. Please read the API reference in case you are
109//! interested.
110//!
111//! Given that the schema we defined above is that of an Avro *Record*, we are going to use the
112//! associated type provided by the library to specify the data we want to serialize:
113//!
114//! ```
115//! # use mz_avro::Schema;
116//! use mz_avro::types::Record;
117//! use mz_avro::Writer;
118//! #
119//! # let raw_schema = r#"
120//! # {
121//! # "type": "record",
122//! # "name": "test",
123//! # "fields": [
124//! # {"name": "a", "type": "long", "default": 42},
125//! # {"name": "b", "type": "string"}
126//! # ]
127//! # }
128//! # "#;
129//! # let schema: Schema = raw_schema.parse().unwrap();
130//! // a writer needs a schema and something to write to
131//! let mut writer = Writer::new(schema.clone(), Vec::new());
132//!
133//! // the Record type models our Record schema
134//! let mut record = Record::new(schema.top_node()).unwrap();
135//! record.put("a", 27i64);
136//! record.put("b", "foo");
137//!
138//! // schema validation happens here
139//! writer.append(record).unwrap();
140//!
141//! // flushing makes sure that all data gets encoded
142//! writer.flush().unwrap();
143//!
144//! // this is how to get back the resulting avro bytecode
145//! let encoded = writer.into_inner();
146//! ```
147//!
148//! The vast majority of the time, schemas tend to define a record as a top-level container
149//! encapsulating all the values to convert as fields and providing documentation for them, but in
150//! case we want to directly define an Avro value, the library offers that capability via the
151//! `Value` interface.
152//!
153//! ```
154//! use mz_avro::types::Value;
155//!
156//! let mut value = Value::String("foo".to_string());
157//! ```
158//!
159//! ## Using codecs to compress data
160//!
161//! Avro supports three different compression codecs when encoding data:
162//!
163//! * **Null**: leaves data uncompressed;
164//! * **Deflate**: writes the data block using the deflate algorithm as specified in RFC 1951, and
165//! typically implemented using the zlib library. Note that this format (unlike the "zlib format" in
166//! RFC 1950) does not have a checksum.
167//! * **Snappy**: uses Google's [Snappy](http://google.github.io/snappy/) compression library. Each
168//! compressed block is followed by the 4-byte, big-endianCRC32 checksum of the uncompressed data in
169//! the block. You must enable the `snappy` feature to use this codec.
170//!
171//! To specify a codec to use to compress data, just specify it while creating a `Writer`:
172//! ```
173//! # use mz_avro::Schema;
174//! use mz_avro::Writer;
175//! use mz_avro::Codec;
176//! #
177//! # let raw_schema = r#"
178//! # {
179//! # "type": "record",
180//! # "name": "test",
181//! # "fields": [
182//! # {"name": "a", "type": "long", "default": 42},
183//! # {"name": "b", "type": "string"}
184//! # ]
185//! # }
186//! # "#;
187//! # let schema: Schema = raw_schema.parse().unwrap();
188//! let mut writer = Writer::with_codec(schema, Vec::new(), Codec::Deflate);
189//! ```
190//!
191//! # Reading data
192//!
193//! As far as reading Avro encoded data goes, we can just use the schema encoded with the data to
194//! read them. The library will do it automatically for us, as it already does for the compression
195//! codec:
196//!
197//! ```
198//!
199//! use mz_avro::Reader;
200//! # use mz_avro::Schema;
201//! # use mz_avro::types::Record;
202//! # use mz_avro::Writer;
203//! #
204//! # let raw_schema = r#"
205//! # {
206//! # "type": "record",
207//! # "name": "test",
208//! # "fields": [
209//! # {"name": "a", "type": "long", "default": 42},
210//! # {"name": "b", "type": "string"}
211//! # ]
212//! # }
213//! # "#;
214//! # let schema: Schema = raw_schema.parse().unwrap();
215//! # let mut writer = Writer::new(schema.clone(), Vec::new());
216//! # let mut record = Record::new(schema.top_node()).unwrap();
217//! # record.put("a", 27i64);
218//! # record.put("b", "foo");
219//! # writer.append(record).unwrap();
220//! # writer.flush().unwrap();
221//! # let input = writer.into_inner();
222//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
223//! let reader = Reader::new(&input[..]).unwrap();
224//! ```
225//!
226//! In case, instead, we want to specify a different (but compatible) reader schema from the schema
227//! the data has been written with, we can just do as the following:
228//! ```
229//! use mz_avro::Schema;
230//! use mz_avro::Reader;
231//! # use mz_avro::types::Record;
232//! # use mz_avro::Writer;
233//! #
234//! # let writer_raw_schema = r#"
235//! # {
236//! # "type": "record",
237//! # "name": "test",
238//! # "fields": [
239//! # {"name": "a", "type": "long", "default": 42},
240//! # {"name": "b", "type": "string"}
241//! # ]
242//! # }
243//! # "#;
244//! # let writer_schema: Schema = writer_raw_schema.parse().unwrap();
245//! # let mut writer = Writer::new(writer_schema.clone(), Vec::new());
246//! # let mut record = Record::new(writer_schema.top_node()).unwrap();
247//! # record.put("a", 27i64);
248//! # record.put("b", "foo");
249//! # writer.append(record).unwrap();
250//! # writer.flush().unwrap();
251//! # let input = writer.into_inner();
252//!
253//! let reader_raw_schema = r#"
254//! {
255//! "type": "record",
256//! "name": "test",
257//! "fields": [
258//! {"name": "a", "type": "long", "default": 42},
259//! {"name": "b", "type": "string"},
260//! {"name": "c", "type": "long", "default": 43}
261//! ]
262//! }
263//! "#;
264//!
265//! let reader_schema: Schema = reader_raw_schema.parse().unwrap();
266//!
267//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
268//! let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
269//! ```
270//!
271//! The library will also automatically perform schema resolution while reading the data.
272//!
273//! For more information about schema compatibility and resolution, please refer to the
274//! [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas).
275//!
276//! There are two ways to handle deserializing Avro data in Rust, as you can see below.
277//!
278//! **NOTE:** The library also provides a low-level interface for decoding a single datum in Avro
279//! bytecode without markers and header (for advanced use), but we highly recommend the `Reader`
280//! interface to leverage all Avro features. Please read the API reference in case you are
281//! interested.
282//!
283//!
284//! ## The avro way
285//!
286//! We can just read directly instances of `Value` out of the `Reader` iterator:
287//!
288//! ```
289//! # use mz_avro::Schema;
290//! # use mz_avro::types::Record;
291//! # use mz_avro::Writer;
292//! use mz_avro::Reader;
293//! #
294//! # let raw_schema = r#"
295//! # {
296//! # "type": "record",
297//! # "name": "test",
298//! # "fields": [
299//! # {"name": "a", "type": "long", "default": 42},
300//! # {"name": "b", "type": "string"}
301//! # ]
302//! # }
303//! # "#;
304//! # let schema: Schema = raw_schema.parse().unwrap();
305//! # let mut writer = Writer::new(schema.clone(), Vec::new());
306//! # let mut record = Record::new(schema.top_node()).unwrap();
307//! # record.put("a", 27i64);
308//! # record.put("b", "foo");
309//! # writer.append(record).unwrap();
310//! # writer.flush().unwrap();
311//! # let input = writer.into_inner();
312//! let mut reader = Reader::new(&input[..]).unwrap();
313//!
314//! // value is a Result of an Avro Value in case the read operation fails
315//! for value in reader {
316//! println!("{:?}", value.unwrap());
317//! }
318//!
319//! ```
320//!
321//! ## Custom deserialization (advanced)
322//!
323//! It is possible to avoid the intermediate stage of decoding to `Value`,
324//! by implementing `AvroDecode` for one or more structs that will determine how to decode various schema pieces.
325//!
326//! This API is in flux, and more complete documentation is coming soon. For now,
327//! [Materialize](https://github.com/MaterializeInc/materialize/blob/main/src/interchange/src/avro.rs)
328//! furnishes the most complete example.
329
330// TODO(benesch): remove this once this crate no longer makes use of potentially
331// dangerous `as` conversions.
332#![allow(clippy::as_conversions)]
333
334mod codec;
335mod decode;
336mod reader;
337mod util;
338mod writer;
339
340pub mod encode;
341pub mod error;
342pub mod schema;
343pub mod types;
344
345pub use crate::codec::Codec;
346pub use crate::decode::public_decoders::*;
347pub use crate::decode::{
348 AvroArrayAccess, AvroDecodable, AvroDecode, AvroDeserializer, AvroFieldAccess, AvroMapAccess,
349 AvroRead, AvroRecordAccess, GeneralDeserializer, Skip, StatefulAvroDecodable, ValueOrReader,
350 give_value,
351};
352pub use crate::encode::encode as encode_unchecked;
353pub use crate::reader::{Block, BlockIter, Reader, from_avro_datum};
354pub use crate::schema::{ParseSchemaError, Schema};
355pub use crate::types::SchemaResolutionError;
356pub use crate::writer::{ValidationError, Writer, to_avro_datum, write_avro_datum};
357
358#[cfg(test)]
359mod tests {
360 use std::str::FromStr;
361
362 use mz_ore::{assert_err, assert_none};
363
364 use crate::reader::Reader;
365 use crate::schema::Schema;
366 use crate::types::{Record, Value};
367
368 use super::*;
369
370 //TODO: move where it fits better
371 #[mz_ore::test]
372 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
373 fn test_enum_default() {
374 let writer_raw_schema = r#"
375 {
376 "type": "record",
377 "name": "test",
378 "fields": [
379 {"name": "a", "type": "long", "default": 42},
380 {"name": "b", "type": "string"}
381 ]
382 }
383 "#;
384 let reader_raw_schema = r#"
385 {
386 "type": "record",
387 "name": "test",
388 "fields": [
389 {"name": "a", "type": "long", "default": 42},
390 {"name": "b", "type": "string"},
391 {
392 "name": "c",
393 "type": {
394 "type": "enum",
395 "name": "suit",
396 "symbols": ["diamonds", "spades", "clubs", "hearts"]
397 },
398 "default": "spades"
399 }
400 ]
401 }
402 "#;
403 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
404 let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
405 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
406 let mut record = Record::new(writer_schema.top_node()).unwrap();
407 record.put("a", 27i64);
408 record.put("b", "foo");
409 writer.append(record).unwrap();
410 writer.flush().unwrap();
411 let input = writer.into_inner();
412 let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
413 assert_eq!(
414 reader.next().unwrap().unwrap(),
415 Value::Record(vec![
416 ("a".to_string(), Value::Long(27)),
417 ("b".to_string(), Value::String("foo".to_string())),
418 ("c".to_string(), Value::Enum(1, "spades".to_string())),
419 ])
420 );
421 assert_none!(reader.next());
422 }
423
424 //TODO: move where it fits better
425 #[mz_ore::test]
426 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
427 fn test_enum_string_value() {
428 let raw_schema = r#"
429 {
430 "type": "record",
431 "name": "test",
432 "fields": [
433 {"name": "a", "type": "long", "default": 42},
434 {"name": "b", "type": "string"},
435 {
436 "name": "c",
437 "type": {
438 "type": "enum",
439 "name": "suit",
440 "symbols": ["diamonds", "spades", "clubs", "hearts"]
441 },
442 "default": "spades"
443 }
444 ]
445 }
446 "#;
447 let schema = Schema::from_str(raw_schema).unwrap();
448 let mut writer = Writer::with_codec(schema.clone(), Vec::new(), Codec::Null);
449 let mut record = Record::new(schema.top_node()).unwrap();
450 record.put("a", 27i64);
451 record.put("b", "foo");
452 record.put("c", "clubs");
453 writer.append(record).unwrap();
454 writer.flush().unwrap();
455 let input = writer.into_inner();
456 let mut reader = Reader::with_schema(&schema, &input[..]).unwrap();
457 assert_eq!(
458 reader.next().unwrap().unwrap(),
459 Value::Record(vec![
460 ("a".to_string(), Value::Long(27)),
461 ("b".to_string(), Value::String("foo".to_string())),
462 ("c".to_string(), Value::Enum(2, "clubs".to_string())),
463 ])
464 );
465 assert_none!(reader.next());
466 }
467
468 //TODO: move where it fits better
469 #[mz_ore::test]
470 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
471 fn test_enum_resolution() {
472 let writer_raw_schema = r#"
473 {
474 "type": "record",
475 "name": "test",
476 "fields": [
477 {"name": "a", "type": "long", "default": 42},
478 {"name": "b", "type": "string"},
479 {
480 "name": "c",
481 "type": {
482 "type": "enum",
483 "name": "suit",
484 "symbols": ["diamonds", "spades", "clubs", "hearts"]
485 },
486 "default": "spades"
487 }
488 ]
489 }
490 "#;
491 let reader_raw_schema = r#"
492 {
493 "type": "record",
494 "name": "test",
495 "fields": [
496 {"name": "a", "type": "long", "default": 42},
497 {"name": "b", "type": "string"},
498 {
499 "name": "c",
500 "type": {
501 "type": "enum",
502 "name": "suit",
503 "symbols": ["diamonds", "spades", "ninja", "hearts"]
504 },
505 "default": "spades"
506 }
507 ]
508 }
509 "#;
510 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
511 let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
512 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
513 let mut record = Record::new(writer_schema.top_node()).unwrap();
514 record.put("a", 27i64);
515 record.put("b", "foo");
516 record.put("c", "clubs");
517 writer.append(record).unwrap();
518 writer.flush().unwrap();
519 let input = writer.into_inner();
520 let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
521 assert_err!(reader.next().unwrap());
522 assert_none!(reader.next());
523 }
524
525 //TODO: move where it fits better
526 #[mz_ore::test]
527 fn test_enum_no_reader_schema() {
528 let writer_raw_schema = r#"
529 {
530 "type": "record",
531 "name": "test",
532 "fields": [
533 {"name": "a", "type": "long", "default": 42},
534 {"name": "b", "type": "string"},
535 {
536 "name": "c",
537 "type": {
538 "type": "enum",
539 "name": "suit",
540 "symbols": ["diamonds", "spades", "clubs", "hearts"]
541 },
542 "default": "spades"
543 }
544 ]
545 }
546 "#;
547 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
548 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
549 let mut record = Record::new(writer_schema.top_node()).unwrap();
550 record.put("a", 27i64);
551 record.put("b", "foo");
552 record.put("c", "clubs");
553 writer.append(record).unwrap();
554 writer.flush().unwrap();
555 let input = writer.into_inner();
556 let mut reader = Reader::new(&input[..]).unwrap();
557 assert_eq!(
558 reader.next().unwrap().unwrap(),
559 Value::Record(vec![
560 ("a".to_string(), Value::Long(27)),
561 ("b".to_string(), Value::String("foo".to_string())),
562 ("c".to_string(), Value::Enum(2, "clubs".to_string())),
563 ])
564 );
565 }
566 #[mz_ore::test]
567 fn test_datetime_value() {
568 let writer_raw_schema = r#"{
569 "type": "record",
570 "name": "dttest",
571 "fields": [
572 {
573 "name": "a",
574 "type": {
575 "type": "long",
576 "logicalType": "timestamp-micros"
577 }
578 }
579 ]}"#;
580 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
581 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
582 let mut record = Record::new(writer_schema.top_node()).unwrap();
583 let dt = chrono::DateTime::from_timestamp(1_000, 995_000_000)
584 .unwrap()
585 .naive_utc();
586 record.put("a", types::Value::Timestamp(dt));
587 writer.append(record).unwrap();
588 writer.flush().unwrap();
589 let input = writer.into_inner();
590 let mut reader = Reader::new(&input[..]).unwrap();
591 assert_eq!(
592 reader.next().unwrap().unwrap(),
593 Value::Record(vec![("a".to_string(), Value::Timestamp(dt)),])
594 );
595 }
596
597 #[mz_ore::test]
598 fn test_malformed_length() {
599 let raw_schema = r#"
600 {
601 "type": "record",
602 "name": "test",
603 "fields": [
604 {"name": "a", "type": "long", "default": 42},
605 {"name": "b", "type": "string"}
606 ]
607 }
608 "#;
609
610 let schema = Schema::from_str(raw_schema).unwrap();
611
612 // Would allocated 18446744073709551605 bytes
613 let malformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff];
614
615 let value = from_avro_datum(&schema, &mut &malformed[..]);
616 assert_err!(value);
617 }
618}