mz_avro/lib.rs
1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! # avro
25//! **[Apache Avro](https://avro.apache.org/)** is a data serialization system which provides rich
26//! data structures and a compact, fast, binary data format.
27//!
28//! All data in Avro is schematized, as in the following example:
29//!
30//! ```text
31//! {
32//! "type": "record",
33//! "name": "test",
34//! "fields": [
35//! {"name": "a", "type": "long", "default": 42},
36//! {"name": "b", "type": "string"}
37//! ]
38//! }
39//! ```
40//!
41//! There are basically two ways of handling Avro data in Rust:
42//!
43//! * **as Avro-specialized data types** based on an Avro schema;
44//! * **as generic Rust types** with custom serialization logic implementing `AvroDecode`
45//! (currently only supports deserialization, not serialization).
46//!
47//! **avro** provides a way to read and write both these data representations easily and
48//! efficiently.
49//!
50//! # Installing the library
51//!
52//!
53//! Add to your `Cargo.toml`:
54//!
55//! ```text
56//! [dependencies]
57//! avro = "x.y"
58//! ```
59//!
60//! Or in case you want to leverage the **Snappy** codec:
61//!
62//! ```text
63//! [dependencies.avro]
64//! version = "x.y"
65//! features = ["snappy"]
66//! ```
67//!
68//! # Defining a schema
69//!
70//! Avro data cannot exist without an Avro schema. Schemas **must** be used both while writing and
71//! reading and they carry the information regarding the type of data we are
72//! handling. Avro schemas are used for both schema validation and resolution of Avro data.
73//!
74//! Avro schemas are defined in **JSON** format and can just be parsed out of a raw string:
75//!
76//! ```
77//! use mz_avro::Schema;
78//!
79//! let raw_schema = r#"
80//! {
81//! "type": "record",
82//! "name": "test",
83//! "fields": [
84//! {"name": "a", "type": "long", "default": 42},
85//! {"name": "b", "type": "string"}
86//! ]
87//! }
88//! "#;
89//!
90//! // if the schema is not valid, this function will return an error
91//! let schema: Schema = raw_schema.parse().unwrap();
92//!
93//! // schemas can be printed for debugging
94//! println!("{:?}", schema);
95//! ```
96//!
97//! For more information about schemas and what kind of information you can encapsulate in them,
98//! please refer to the appropriate section of the
99//! [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration).
100//!
101//! # Writing data
102//!
103//! Once we have defined a schema, we are ready to serialize data in Avro, validating them against
104//! the provided schema in the process.
105//!
106//! **NOTE:** The library also provides a low-level interface for encoding a single datum in Avro
107//! bytecode without generating markers and headers (for advanced use), but we highly recommend the
108//! `Writer` interface to be totally Avro-compatible. Please read the API reference in case you are
109//! interested.
110//!
111//! Given that the schema we defined above is that of an Avro *Record*, we are going to use the
112//! associated type provided by the library to specify the data we want to serialize:
113//!
114//! ```
115//! # use mz_avro::Schema;
116//! use mz_avro::types::Record;
117//! use mz_avro::Writer;
118//! #
119//! # let raw_schema = r#"
120//! # {
121//! # "type": "record",
122//! # "name": "test",
123//! # "fields": [
124//! # {"name": "a", "type": "long", "default": 42},
125//! # {"name": "b", "type": "string"}
126//! # ]
127//! # }
128//! # "#;
129//! # let schema: Schema = raw_schema.parse().unwrap();
130//! // a writer needs a schema and something to write to
131//! let mut writer = Writer::new(schema.clone(), Vec::new());
132//!
133//! // the Record type models our Record schema
134//! let mut record = Record::new(schema.top_node()).unwrap();
135//! record.put("a", 27i64);
136//! record.put("b", "foo");
137//!
138//! // schema validation happens here
139//! writer.append(record).unwrap();
140//!
141//! // flushing makes sure that all data gets encoded
142//! writer.flush().unwrap();
143//!
144//! // this is how to get back the resulting avro bytecode
145//! let encoded = writer.into_inner();
146//! ```
147//!
148//! The vast majority of the time, schemas tend to define a record as a top-level container
149//! encapsulating all the values to convert as fields and providing documentation for them, but in
150//! case we want to directly define an Avro value, the library offers that capability via the
151//! `Value` interface.
152//!
153//! ```
154//! use mz_avro::types::Value;
155//!
156//! let mut value = Value::String("foo".to_string());
157//! ```
158//!
159//! ## Using codecs to compress data
160//!
161//! Avro supports three different compression codecs when encoding data:
162//!
163//! * **Null**: leaves data uncompressed;
164//! * **Deflate**: writes the data block using the deflate algorithm as specified in RFC 1951, and
165//! typically implemented using the zlib library. Note that this format (unlike the "zlib format" in
166//! RFC 1950) does not have a checksum.
167//! * **Snappy**: uses Google's [Snappy](http://google.github.io/snappy/) compression library. Each
168//! compressed block is followed by the 4-byte, big-endianCRC32 checksum of the uncompressed data in
169//! the block. You must enable the `snappy` feature to use this codec.
170//!
171//! To specify a codec to use to compress data, just specify it while creating a `Writer`:
172//! ```
173//! # use mz_avro::Schema;
174//! use mz_avro::Writer;
175//! use mz_avro::Codec;
176//! #
177//! # let raw_schema = r#"
178//! # {
179//! # "type": "record",
180//! # "name": "test",
181//! # "fields": [
182//! # {"name": "a", "type": "long", "default": 42},
183//! # {"name": "b", "type": "string"}
184//! # ]
185//! # }
186//! # "#;
187//! # let schema: Schema = raw_schema.parse().unwrap();
188//! let mut writer = Writer::with_codec(schema, Vec::new(), Codec::Deflate);
189//! ```
190//!
191//! # Reading data
192//!
193//! As far as reading Avro encoded data goes, we can just use the schema encoded with the data to
194//! read them. The library will do it automatically for us, as it already does for the compression
195//! codec:
196//!
197//! ```
198//!
199//! use mz_avro::Reader;
200//! # use mz_avro::Schema;
201//! # use mz_avro::types::Record;
202//! # use mz_avro::Writer;
203//! #
204//! # let raw_schema = r#"
205//! # {
206//! # "type": "record",
207//! # "name": "test",
208//! # "fields": [
209//! # {"name": "a", "type": "long", "default": 42},
210//! # {"name": "b", "type": "string"}
211//! # ]
212//! # }
213//! # "#;
214//! # let schema: Schema = raw_schema.parse().unwrap();
215//! # let mut writer = Writer::new(schema.clone(), Vec::new());
216//! # let mut record = Record::new(schema.top_node()).unwrap();
217//! # record.put("a", 27i64);
218//! # record.put("b", "foo");
219//! # writer.append(record).unwrap();
220//! # writer.flush().unwrap();
221//! # let input = writer.into_inner();
222//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
223//! let reader = Reader::new(&input[..]).unwrap();
224//! ```
225//!
226//! In case, instead, we want to specify a different (but compatible) reader schema from the schema
227//! the data has been written with, we can just do as the following:
228//! ```
229//! use mz_avro::Schema;
230//! use mz_avro::Reader;
231//! # use mz_avro::types::Record;
232//! # use mz_avro::Writer;
233//! #
234//! # let writer_raw_schema = r#"
235//! # {
236//! # "type": "record",
237//! # "name": "test",
238//! # "fields": [
239//! # {"name": "a", "type": "long", "default": 42},
240//! # {"name": "b", "type": "string"}
241//! # ]
242//! # }
243//! # "#;
244//! # let writer_schema: Schema = writer_raw_schema.parse().unwrap();
245//! # let mut writer = Writer::new(writer_schema.clone(), Vec::new());
246//! # let mut record = Record::new(writer_schema.top_node()).unwrap();
247//! # record.put("a", 27i64);
248//! # record.put("b", "foo");
249//! # writer.append(record).unwrap();
250//! # writer.flush().unwrap();
251//! # let input = writer.into_inner();
252//!
253//! let reader_raw_schema = r#"
254//! {
255//! "type": "record",
256//! "name": "test",
257//! "fields": [
258//! {"name": "a", "type": "long", "default": 42},
259//! {"name": "b", "type": "string"},
260//! {"name": "c", "type": "long", "default": 43}
261//! ]
262//! }
263//! "#;
264//!
265//! let reader_schema: Schema = reader_raw_schema.parse().unwrap();
266//!
267//! // reader creation can fail in case the input to read from is not Avro-compatible or malformed
268//! let reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
269//! ```
270//!
271//! The library will also automatically perform schema resolution while reading the data.
272//!
273//! For more information about schema compatibility and resolution, please refer to the
274//! [Avro Specification](https://avro.apache.org/docs/++version++/specification/#schema-declaration).
275//!
276//! There are two ways to handle deserializing Avro data in Rust, as you can see below.
277//!
278//! **NOTE:** The library also provides a low-level interface for decoding a single datum in Avro
279//! bytecode without markers and header (for advanced use), but we highly recommend the `Reader`
280//! interface to leverage all Avro features. Please read the API reference in case you are
281//! interested.
282//!
283//!
284//! ## The avro way
285//!
286//! We can just read directly instances of `Value` out of the `Reader` iterator:
287//!
288//! ```
289//! # use mz_avro::Schema;
290//! # use mz_avro::types::Record;
291//! # use mz_avro::Writer;
292//! use mz_avro::Reader;
293//! #
294//! # let raw_schema = r#"
295//! # {
296//! # "type": "record",
297//! # "name": "test",
298//! # "fields": [
299//! # {"name": "a", "type": "long", "default": 42},
300//! # {"name": "b", "type": "string"}
301//! # ]
302//! # }
303//! # "#;
304//! # let schema: Schema = raw_schema.parse().unwrap();
305//! # let mut writer = Writer::new(schema.clone(), Vec::new());
306//! # let mut record = Record::new(schema.top_node()).unwrap();
307//! # record.put("a", 27i64);
308//! # record.put("b", "foo");
309//! # writer.append(record).unwrap();
310//! # writer.flush().unwrap();
311//! # let input = writer.into_inner();
312//! let mut reader = Reader::new(&input[..]).unwrap();
313//!
314//! // value is a Result of an Avro Value in case the read operation fails
315//! for value in reader {
316//! println!("{:?}", value.unwrap());
317//! }
318//!
319//! ```
320//!
321//! ## Custom deserialization (advanced)
322//!
323//! It is possible to avoid the intermediate stage of decoding to `Value`,
324//! by implementing `AvroDecode` for one or more structs that will determine how to decode various schema pieces.
325//!
326//! This API is in flux, and more complete documentation is coming soon. For now,
327//! [Materialize](https://github.com/MaterializeInc/materialize/blob/main/src/interchange/src/avro.rs)
328//! furnishes the most complete example.
329
330// TODO(benesch): remove this once this crate no longer makes use of potentially
331// dangerous `as` conversions.
332#![allow(clippy::as_conversions)]
333
334mod codec;
335mod decode;
336mod reader;
337mod util;
338mod writer;
339
340pub mod encode;
341pub mod error;
342pub mod schema;
343pub mod types;
344
345pub use crate::codec::Codec;
346pub use crate::decode::public_decoders::*;
347pub use crate::decode::{
348 AvroArrayAccess, AvroDecodable, AvroDecode, AvroDeserializer, AvroFieldAccess, AvroMapAccess,
349 AvroRead, AvroRecordAccess, GeneralDeserializer, Skip, StatefulAvroDecodable, ValueOrReader,
350 give_value,
351};
352pub use crate::encode::encode as encode_unchecked;
353pub use crate::reader::{Block, BlockIter, Reader, from_avro_datum};
354pub use crate::schema::{ParseSchemaError, Schema};
355pub use crate::types::SchemaResolutionError;
356pub use crate::writer::{ValidationError, Writer, to_avro_datum, write_avro_datum};
357
358#[cfg(test)]
359mod tests {
360 use std::str::FromStr;
361
362 use mz_ore::{assert_err, assert_none};
363
364 use crate::reader::Reader;
365 use crate::schema::Schema;
366 use crate::types::{Record, Value};
367
368 use super::*;
369
370 //TODO: move where it fits better
371 #[mz_ore::test]
372 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
373 fn test_enum_default() {
374 let writer_raw_schema = r#"
375 {
376 "type": "record",
377 "name": "test",
378 "fields": [
379 {"name": "a", "type": "long", "default": 42},
380 {"name": "b", "type": "string"}
381 ]
382 }
383 "#;
384 let reader_raw_schema = r#"
385 {
386 "type": "record",
387 "name": "test",
388 "fields": [
389 {"name": "a", "type": "long", "default": 42},
390 {"name": "b", "type": "string"},
391 {
392 "name": "c",
393 "type": {
394 "type": "enum",
395 "name": "suit",
396 "symbols": ["diamonds", "spades", "clubs", "hearts"]
397 },
398 "default": "spades"
399 }
400 ]
401 }
402 "#;
403 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
404 let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
405 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
406 let mut record = Record::new(writer_schema.top_node()).unwrap();
407 record.put("a", 27i64);
408 record.put("b", "foo");
409 writer.append(record).unwrap();
410 writer.flush().unwrap();
411 let input = writer.into_inner();
412 let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
413 assert_eq!(
414 reader.next().unwrap().unwrap(),
415 Value::Record(vec![
416 ("a".to_string(), Value::Long(27)),
417 ("b".to_string(), Value::String("foo".to_string())),
418 ("c".to_string(), Value::Enum(1, "spades".to_string())),
419 ])
420 );
421 assert_none!(reader.next());
422 }
423
424 //TODO: move where it fits better
425 #[mz_ore::test]
426 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
427 fn test_enum_string_value() {
428 let raw_schema = r#"
429 {
430 "type": "record",
431 "name": "test",
432 "fields": [
433 {"name": "a", "type": "long", "default": 42},
434 {"name": "b", "type": "string"},
435 {
436 "name": "c",
437 "type": {
438 "type": "enum",
439 "name": "suit",
440 "symbols": ["diamonds", "spades", "clubs", "hearts"]
441 },
442 "default": "spades"
443 }
444 ]
445 }
446 "#;
447 let schema = Schema::from_str(raw_schema).unwrap();
448 let mut writer = Writer::with_codec(schema.clone(), Vec::new(), Codec::Null);
449 let mut record = Record::new(schema.top_node()).unwrap();
450 record.put("a", 27i64);
451 record.put("b", "foo");
452 record.put("c", "clubs");
453 writer.append(record).unwrap();
454 writer.flush().unwrap();
455 let input = writer.into_inner();
456 let mut reader = Reader::with_schema(&schema, &input[..]).unwrap();
457 assert_eq!(
458 reader.next().unwrap().unwrap(),
459 Value::Record(vec![
460 ("a".to_string(), Value::Long(27)),
461 ("b".to_string(), Value::String("foo".to_string())),
462 ("c".to_string(), Value::Enum(2, "clubs".to_string())),
463 ])
464 );
465 assert_none!(reader.next());
466 }
467
468 //TODO: move where it fits better
469 #[mz_ore::test]
470 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
471 fn test_enum_resolution() {
472 let writer_raw_schema = r#"
473 {
474 "type": "record",
475 "name": "test",
476 "fields": [
477 {"name": "a", "type": "long", "default": 42},
478 {"name": "b", "type": "string"},
479 {
480 "name": "c",
481 "type": {
482 "type": "enum",
483 "name": "suit",
484 "symbols": ["diamonds", "spades", "clubs", "hearts"]
485 },
486 "default": "spades"
487 }
488 ]
489 }
490 "#;
491 let reader_raw_schema = r#"
492 {
493 "type": "record",
494 "name": "test",
495 "fields": [
496 {"name": "a", "type": "long", "default": 42},
497 {"name": "b", "type": "string"},
498 {
499 "name": "c",
500 "type": {
501 "type": "enum",
502 "name": "suit",
503 "symbols": ["diamonds", "spades", "ninja", "hearts"]
504 },
505 "default": "spades"
506 }
507 ]
508 }
509 "#;
510 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
511 let reader_schema = Schema::from_str(reader_raw_schema).unwrap();
512 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
513 let mut record = Record::new(writer_schema.top_node()).unwrap();
514 record.put("a", 27i64);
515 record.put("b", "foo");
516 record.put("c", "clubs");
517 writer.append(record).unwrap();
518 writer.flush().unwrap();
519 let input = writer.into_inner();
520 let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
521 assert_err!(reader.next().unwrap());
522 assert_none!(reader.next());
523 }
524
525 // Avro numeric promotion across every promotable pair, with the field
526 // rendered both bare (`"int"`) and nullable (`["null", "int"]`) on each
527 // side. Promoting a nullable column means promoting inside the `["null",
528 // T]` union; that previously failed at decode time with "Failed to match
529 // writer union variant ... against any variant in the reader" because union
530 // variant matching ignored promotion. The four bare/nullable combinations
531 // cover concrete -> union, union -> concrete, union -> union, and the plain
532 // concrete -> concrete path.
533 #[mz_ore::test]
534 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
535 fn test_numeric_promotion_matrix() {
536 // `123` is exactly representable in int/long/float/double, so a value
537 // promoted to `kind` equals that kind's representative value.
538 fn value_of(kind: &str) -> Value {
539 match kind {
540 "int" => Value::Int(123),
541 "long" => Value::Long(123),
542 "float" => Value::Float(123.0),
543 "double" => Value::Double(123.0),
544 other => panic!("unhandled kind {other}"),
545 }
546 }
547 fn record_schema_json(kind: &str, nullable: bool) -> String {
548 let ty = if nullable {
549 format!(r#"["null", "{kind}"]"#)
550 } else {
551 format!(r#""{kind}""#)
552 };
553 format!(r#"{{"type":"record","name":"test","fields":[{{"name":"f1","type":{ty}}}]}}"#)
554 }
555 // A nullable field is a `["null", T]` union, so the value lives in
556 // variant 1 with the null variant at 0.
557 fn maybe_union(v: Value, nullable: bool) -> Value {
558 if nullable {
559 Value::Union {
560 index: 1,
561 inner: Box::new(v),
562 n_variants: 2,
563 null_variant: Some(0),
564 }
565 } else {
566 v
567 }
568 }
569
570 // Every promotable numeric pair (int/long/float/double widening).
571 let pairs = [
572 ("int", "long"),
573 ("int", "float"),
574 ("int", "double"),
575 ("long", "float"),
576 ("long", "double"),
577 ("float", "double"),
578 ];
579 for (writer_kind, reader_kind) in pairs {
580 for writer_nullable in [false, true] {
581 for reader_nullable in [false, true] {
582 let writer_schema =
583 Schema::from_str(&record_schema_json(writer_kind, writer_nullable))
584 .unwrap();
585 let reader_schema =
586 Schema::from_str(&record_schema_json(reader_kind, reader_nullable))
587 .unwrap();
588 let mut writer =
589 Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
590 let mut record = Record::new(writer_schema.top_node()).unwrap();
591 record.put("f1", maybe_union(value_of(writer_kind), writer_nullable));
592 writer.append(record).unwrap();
593 writer.flush().unwrap();
594 let input = writer.into_inner();
595
596 let mut reader = Reader::with_schema(&reader_schema, &input[..]).unwrap();
597 let expected = maybe_union(value_of(reader_kind), reader_nullable);
598 assert_eq!(
599 reader.next().unwrap().unwrap(),
600 Value::Record(vec![("f1".to_string(), expected)]),
601 "promotion {writer_kind}{} -> {reader_kind}{} decoded incorrectly",
602 if writer_nullable { " (nullable)" } else { "" },
603 if reader_nullable { " (nullable)" } else { "" },
604 );
605 assert_none!(reader.next());
606 }
607 }
608 }
609 }
610
611 // Promotion-aware union matching must still *reject* changes that are not
612 // valid promotions; a too-permissive matcher would match a variant and then
613 // mis-decode or silently corrupt the value. Each case is a numeric
614 // narrowing (`double` -> `int`) or an incompatible change (`string` ->
615 // `int`), across the same bare/nullable combinations as the positive
616 // matrix, and must fail rather than produce a value.
617 #[mz_ore::test]
618 #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
619 fn test_non_promotable_changes_are_rejected() {
620 fn record_with_field(ty: &str) -> Schema {
621 Schema::from_str(&format!(
622 r#"{{"type":"record","name":"test","fields":[{{"name":"f1","type":{ty}}}]}}"#
623 ))
624 .unwrap()
625 }
626 // (writer field type, writer value, reader field type).
627 let cases = [
628 // Numeric narrowing, exercising each union matcher's reject path.
629 (
630 r#"["null", "double"]"#,
631 Value::Double(1.0),
632 r#"["null", "int"]"#,
633 ), // union -> union
634 (r#""double""#, Value::Double(1.0), r#"["null", "int"]"#), // bare -> union
635 (r#"["null", "double"]"#, Value::Double(1.0), r#""int""#), // union -> bare
636 (r#""double""#, Value::Double(1.0), r#""int""#), // bare -> bare
637 // Incompatible (non-numeric) change.
638 (
639 r#"["null", "string"]"#,
640 Value::String("x".to_string()),
641 r#"["null", "int"]"#,
642 ),
643 (r#""string""#, Value::String("x".to_string()), r#""int""#),
644 ];
645 for (writer_ty, writer_value, reader_ty) in cases {
646 let writer_schema = record_with_field(writer_ty);
647 let reader_schema = record_with_field(reader_ty);
648 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
649 let mut record = Record::new(writer_schema.top_node()).unwrap();
650 let field_value = if writer_ty.starts_with('[') {
651 Value::Union {
652 index: 1,
653 inner: Box::new(writer_value),
654 n_variants: 2,
655 null_variant: Some(0),
656 }
657 } else {
658 writer_value
659 };
660 record.put("f1", field_value);
661 writer.append(record).unwrap();
662 writer.flush().unwrap();
663 let input = writer.into_inner();
664
665 // Resolution may reject the pair up front, or decoding may error;
666 // either is acceptable, but it must never silently yield a value.
667 let produced_value = match Reader::with_schema(&reader_schema, &input[..]) {
668 Err(_) => false,
669 Ok(mut reader) => matches!(reader.next(), Some(Ok(_))),
670 };
671 assert!(
672 !produced_value,
673 "non-promotable change {writer_ty} -> {reader_ty} was not rejected",
674 );
675 }
676 }
677
678 //TODO: move where it fits better
679 #[mz_ore::test]
680 fn test_enum_no_reader_schema() {
681 let writer_raw_schema = r#"
682 {
683 "type": "record",
684 "name": "test",
685 "fields": [
686 {"name": "a", "type": "long", "default": 42},
687 {"name": "b", "type": "string"},
688 {
689 "name": "c",
690 "type": {
691 "type": "enum",
692 "name": "suit",
693 "symbols": ["diamonds", "spades", "clubs", "hearts"]
694 },
695 "default": "spades"
696 }
697 ]
698 }
699 "#;
700 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
701 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
702 let mut record = Record::new(writer_schema.top_node()).unwrap();
703 record.put("a", 27i64);
704 record.put("b", "foo");
705 record.put("c", "clubs");
706 writer.append(record).unwrap();
707 writer.flush().unwrap();
708 let input = writer.into_inner();
709 let mut reader = Reader::new(&input[..]).unwrap();
710 assert_eq!(
711 reader.next().unwrap().unwrap(),
712 Value::Record(vec![
713 ("a".to_string(), Value::Long(27)),
714 ("b".to_string(), Value::String("foo".to_string())),
715 ("c".to_string(), Value::Enum(2, "clubs".to_string())),
716 ])
717 );
718 }
719 #[mz_ore::test]
720 fn test_datetime_value() {
721 let writer_raw_schema = r#"{
722 "type": "record",
723 "name": "dttest",
724 "fields": [
725 {
726 "name": "a",
727 "type": {
728 "type": "long",
729 "logicalType": "timestamp-micros"
730 }
731 }
732 ]}"#;
733 let writer_schema = Schema::from_str(writer_raw_schema).unwrap();
734 let mut writer = Writer::with_codec(writer_schema.clone(), Vec::new(), Codec::Null);
735 let mut record = Record::new(writer_schema.top_node()).unwrap();
736 let dt = chrono::DateTime::from_timestamp(1_000, 995_000_000)
737 .unwrap()
738 .naive_utc();
739 record.put("a", types::Value::Timestamp(dt));
740 writer.append(record).unwrap();
741 writer.flush().unwrap();
742 let input = writer.into_inner();
743 let mut reader = Reader::new(&input[..]).unwrap();
744 assert_eq!(
745 reader.next().unwrap().unwrap(),
746 Value::Record(vec![("a".to_string(), Value::Timestamp(dt)),])
747 );
748 }
749
750 #[mz_ore::test]
751 fn test_malformed_length() {
752 let raw_schema = r#"
753 {
754 "type": "record",
755 "name": "test",
756 "fields": [
757 {"name": "a", "type": "long", "default": 42},
758 {"name": "b", "type": "string"}
759 ]
760 }
761 "#;
762
763 let schema = Schema::from_str(raw_schema).unwrap();
764
765 // Would allocated 18446744073709551605 bytes
766 let malformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff];
767
768 let value = from_avro_datum(&schema, &mut &malformed[..]);
769 assert_err!(value);
770 }
771}