mz_avro/
reader.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic handling reading from Avro format at user level.
25
26use std::collections::BTreeMap;
27use std::str::{FromStr, from_utf8};
28
29use serde_json::from_slice;
30use sha2::Sha256;
31
32use crate::decode::{AvroRead, decode};
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35    FullName, NamedSchemaPiece, ParseSchemaError, RecordField, ResolvedDefaultValueField,
36    ResolvedRecordField, Schema, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
37    SchemaPieceRefOrNamed, resolve_schemas,
38};
39use crate::types::Value;
40use crate::{Codec, SchemaResolutionError, util};
41
42#[derive(Debug, Clone)]
43pub(crate) struct Header {
44    writer_schema: Schema,
45    marker: [u8; 16],
46    codec: Codec,
47}
48
49impl Header {
50    pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
51        let meta_schema = Schema {
52            named: vec![],
53            indices: Default::default(),
54            top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
55        };
56
57        let mut buf = [0u8; 4];
58        reader.read_exact(&mut buf)?;
59
60        if buf != [b'O', b'b', b'j', 1u8] {
61            return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
62        }
63
64        if let Value::Map(meta) = decode(meta_schema.top_node(), reader)? {
65            // TODO: surface original parse schema errors instead of coalescing them here
66            let json = meta
67                .get("avro.schema")
68                .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
69                .and_then(|bytes| {
70                    if let Value::Bytes(ref bytes) = *bytes {
71                        from_slice(bytes.as_ref()).map_err(|e| {
72                            AvroError::ParseSchema(ParseSchemaError::new(format!(
73                                "unable to decode schema bytes: {}",
74                                e
75                            )))
76                        })
77                    } else {
78                        unreachable!()
79                    }
80                })?;
81            let writer_schema = Schema::parse(&json).map_err(|e| {
82                ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
83            })?;
84
85            let codec = meta
86                .get("avro.codec")
87                .map(|val| match val {
88                    Value::Bytes(bytes) => from_utf8(bytes.as_ref())
89                        .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
90                        .and_then(|codec| {
91                            Codec::from_str(codec).map_err(|_| {
92                                AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
93                            })
94                        }),
95                    _ => unreachable!(),
96                })
97                .unwrap_or(Ok(Codec::Null))?;
98
99            let mut marker = [0u8; 16];
100            reader.read_exact(&mut marker)?;
101
102            Ok(Header {
103                writer_schema,
104                marker,
105                codec,
106            })
107        } else {
108            unreachable!()
109        }
110    }
111
112    pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
113        (self.writer_schema, self.marker, self.codec)
114    }
115}
116
117pub struct Reader<R> {
118    header: Header,
119    inner: R,
120    errored: bool,
121    resolved_schema: Option<Schema>,
122    messages_remaining: usize,
123    // Internal buffering to reduce allocation.
124    buf: Vec<u8>,
125    buf_idx: usize,
126}
127
128/// An iterator over the `Block`s of a `Reader`
129pub struct BlockIter<R> {
130    inner: Reader<R>,
131}
132
133/// A block of Avro objects from an OCF file
134#[derive(Debug, Clone)]
135pub struct Block {
136    /// The raw bytes for the block
137    pub bytes: Vec<u8>,
138    /// The number of Avro objects in the block
139    pub len: usize,
140}
141
142impl<R: AvroRead> BlockIter<R> {
143    pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
144        Ok(Self {
145            inner: Reader::with_schema(reader_schema, inner)?,
146        })
147    }
148}
149
150impl<R: AvroRead> Iterator for BlockIter<R> {
151    type Item = Result<Block, AvroError>;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        assert!(self.inner.is_empty());
155
156        match self.inner.read_block_next() {
157            Ok(()) => {
158                if self.inner.is_empty() {
159                    None
160                } else {
161                    let bytes = std::mem::take(&mut self.inner.buf);
162                    let len = std::mem::take(&mut self.inner.messages_remaining);
163                    Some(Ok(Block { bytes, len }))
164                }
165            }
166            Err(e) => Some(Err(e)),
167        }
168    }
169}
170
171impl<R: AvroRead> Reader<R> {
172    /// Creates a `Reader` given something implementing the `tokio::io::AsyncRead` trait to read from.
173    /// No reader `Schema` will be set.
174    ///
175    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
176    pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
177        let header = Header::from_reader(&mut inner)?;
178        let reader = Reader {
179            header,
180            inner,
181            errored: false,
182            resolved_schema: None,
183            messages_remaining: 0,
184            buf: vec![],
185            buf_idx: 0,
186        };
187        Ok(reader)
188    }
189
190    /// Creates a `Reader` given a reader `Schema` and something implementing the `tokio::io::AsyncRead` trait
191    /// to read from.
192    ///
193    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
194    pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
195        let header = Header::from_reader(&mut inner)?;
196
197        let writer_schema = &header.writer_schema;
198        let resolved_schema = if reader_schema.fingerprint::<Sha256>().bytes
199            != writer_schema.fingerprint::<Sha256>().bytes
200        {
201            Some(resolve_schemas(writer_schema, reader_schema)?)
202        } else {
203            None
204        };
205
206        Ok(Reader {
207            header,
208            errored: false,
209            resolved_schema,
210            inner,
211            messages_remaining: 0,
212            buf: vec![],
213            buf_idx: 0,
214        })
215    }
216
217    /// Get a reference to the writer `Schema`.
218    pub fn writer_schema(&self) -> &Schema {
219        &self.header.writer_schema
220    }
221
222    /// Get a reference to the resolved schema
223    /// (or just the writer schema, if no reader schema was provided
224    ///  or the two schemas are identical)
225    pub fn schema(&self) -> &Schema {
226        match &self.resolved_schema {
227            Some(schema) => schema,
228            None => self.writer_schema(),
229        }
230    }
231
232    #[inline]
233    /// Read the next Avro value from the file, if one exists.
234    pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
235        if self.is_empty() {
236            self.read_block_next()?;
237            if self.is_empty() {
238                return Ok(None);
239            }
240        }
241
242        let mut block_bytes = &self.buf[self.buf_idx..];
243        let b_original = block_bytes.len();
244        let schema = self.schema();
245        let item = from_avro_datum(schema, &mut block_bytes)?;
246        self.buf_idx += b_original - block_bytes.len();
247        self.messages_remaining -= 1;
248        Ok(Some(item))
249    }
250
251    fn is_empty(&self) -> bool {
252        self.messages_remaining == 0
253    }
254
255    fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
256        // We don't have enough space in the buffer, need to grow it.
257        if n >= self.buf.len() {
258            self.buf.resize(n, 0);
259        }
260
261        self.inner.read_exact(&mut self.buf[..n])?;
262        self.buf_idx = 0;
263        Ok(())
264    }
265
266    fn read_block_next(&mut self) -> Result<(), AvroError> {
267        assert!(self.is_empty(), "Expected self to be empty!");
268        match util::read_long(&mut self.inner) {
269            Ok(block_len) => {
270                self.messages_remaining = block_len as usize;
271                let block_bytes = util::read_long(&mut self.inner)?;
272                self.fill_buf(block_bytes as usize)?;
273                let mut marker = [0u8; 16];
274                self.inner.read_exact(&mut marker)?;
275
276                if marker != self.header.marker {
277                    return Err(DecodeError::MismatchedBlockHeader {
278                        expected: self.header.marker,
279                        actual: marker,
280                    }
281                    .into());
282                }
283
284                // NOTE (JAB): This doesn't fit this Reader pattern very well.
285                // `self.buf` is a growable buffer that is reused as the reader is iterated.
286                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
287                // and replace `buf` with the new one, instead of reusing the same buffer.
288                // We can address this by using some "limited read" type to decode directly
289                // into the buffer. But this is fine, for now.
290                self.header.codec.decompress(&mut self.buf)?;
291
292                Ok(())
293            }
294            Err(e) => {
295                if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
296                    // to not return any error in case we only finished to read cleanly from the stream
297                    Ok(())
298                } else {
299                    Err(e)
300                }
301            }
302        }
303    }
304}
305
306impl<R: AvroRead> Iterator for Reader<R> {
307    type Item = Result<Value, AvroError>;
308
309    fn next(&mut self) -> Option<Self::Item> {
310        // to prevent continuing to read after the first error occurs
311        if self.errored {
312            return None;
313        };
314        match self.read_next() {
315            Ok(opt) => opt.map(Ok),
316            Err(e) => {
317                self.errored = true;
318                Some(Err(e))
319            }
320        }
321    }
322}
323
324pub struct SchemaResolver<'a> {
325    pub named: Vec<Option<NamedSchemaPiece>>,
326    pub indices: BTreeMap<FullName, usize>,
327    pub human_readable_field_path: Vec<String>,
328    pub current_human_readable_path_start: usize,
329    pub writer_to_reader_names: BTreeMap<usize, usize>,
330    pub reader_to_writer_names: BTreeMap<usize, usize>,
331    pub reader_to_resolved_names: BTreeMap<usize, usize>,
332    #[allow(dead_code)]
333    pub reader_fullnames: BTreeMap<usize, &'a FullName>,
334    pub reader_schema: &'a Schema,
335}
336
337impl<'a> SchemaResolver<'a> {
338    fn resolve_named(
339        &mut self,
340        writer: &Schema,
341        reader: &Schema,
342        writer_index: usize,
343        reader_index: usize,
344    ) -> Result<SchemaPiece, AvroError> {
345        let ws = writer.lookup(writer_index);
346        let rs = reader.lookup(reader_index);
347        let typ = match (&ws.piece, &rs.piece) {
348            (
349                SchemaPiece::Record {
350                    fields: w_fields,
351                    lookup: w_lookup,
352                    ..
353                },
354                SchemaPiece::Record {
355                    fields: r_fields,
356                    lookup: _r_lookup,
357                    ..
358                },
359            ) => {
360                let mut defaults = Vec::new();
361                let mut fields: Vec<Option<RecordField>> = Vec::new();
362                for (r_index, rf) in r_fields.iter().enumerate() {
363                    match w_lookup.get(&rf.name) {
364                        None => {
365                            let default_field = match &rf.default {
366                                Some(v) => ResolvedDefaultValueField {
367                                    name: rf.name.clone(),
368                                    doc: rf.doc.clone(),
369                                    default: reader
370                                        .top_node_or_named()
371                                        .step(&rf.schema)
372                                        .lookup()
373                                        .json_to_value(v)?,
374                                    order: rf.order.clone(),
375                                    position: r_index,
376                                },
377                                None => return Err(SchemaResolutionError::new(format!(
378                                    "Reader field `{}.{}` not found in writer, and has no default",
379                                    self.get_current_human_readable_path(),
380                                    rf.name
381                                ))
382                                .into()),
383                            };
384                            defaults.push(default_field);
385                        }
386                        Some(w_index) => {
387                            if fields.len() > *w_index && fields[*w_index].is_some() {
388                                return Err(SchemaResolutionError::new(format!(
389                                    "Duplicate field `{}.{}` in schema",
390                                    self.get_current_human_readable_path(),
391                                    rf.name
392                                ))
393                                .into());
394                            }
395                            let wf = &w_fields[*w_index];
396                            let w_node = SchemaNodeOrNamed {
397                                root: writer,
398                                inner: wf.schema.as_ref(),
399                            };
400                            let r_node = SchemaNodeOrNamed {
401                                root: reader,
402                                inner: rf.schema.as_ref(),
403                            };
404
405                            self.human_readable_field_path.push(rf.name.clone());
406                            let new_inner = self.resolve(w_node, r_node)?;
407                            self.human_readable_field_path.pop();
408
409                            let field = RecordField {
410                                name: rf.name.clone(),
411                                doc: rf.doc.clone(),
412                                default: rf.default.clone(),
413                                schema: new_inner,
414                                order: rf.order.clone(),
415                                position: r_index,
416                            };
417                            while fields.len() <= *w_index {
418                                fields.push(None);
419                            }
420                            fields[*w_index] = Some(field)
421                        }
422                    }
423                }
424                while fields.len() < w_fields.len() {
425                    fields.push(None);
426                }
427                let mut n_present = 0;
428                let fields = fields
429                    .into_iter()
430                    .enumerate()
431                    .map(|(i, rf)| match rf {
432                        Some(rf) => {
433                            n_present += 1;
434                            ResolvedRecordField::Present(rf)
435                        }
436                        None => {
437                            // Clone the chunk of the writer schema appearing here.
438                            // We could probably be clever and avoid some cloning,
439                            // but absolute highest performance probably isn't important for schema resolution.
440                            //
441                            // The cloned writer schema piece is needed to guide decoding of the value,
442                            // since even though it doesn't appear in the reader schema it needs
443                            // to be decoded to know where it ends.
444                            //
445                            // TODO -- We could try to come up with a "Dummy" schema variant
446                            // that does only enough decoding to find the end of a value,
447                            // and maybe save some time.
448                            let writer_schema_piece = SchemaNodeOrNamed {
449                                root: writer,
450                                inner: w_fields[i].schema.as_ref(),
451                            }
452                            .to_schema();
453                            ResolvedRecordField::Absent(writer_schema_piece)
454                        }
455                    })
456                    .collect();
457                let n_reader_fields = defaults.len() + n_present;
458                SchemaPiece::ResolveRecord {
459                    defaults,
460                    fields,
461                    n_reader_fields,
462                }
463            }
464            (
465                SchemaPiece::Enum {
466                    symbols: w_symbols, ..
467                },
468                SchemaPiece::Enum {
469                    symbols: r_symbols,
470                    doc,
471                    default_idx,
472                },
473            ) => {
474                let r_map = r_symbols
475                    .iter()
476                    .enumerate()
477                    .map(|(i, s)| (s, i))
478                    .collect::<BTreeMap<_, _>>();
479                let symbols = w_symbols
480                    .iter()
481                    .map(|s| {
482                        r_map
483                            .get(s)
484                            .map(|i| (*i, s.clone()))
485                            .ok_or_else(|| s.clone())
486                    })
487                    .collect();
488                SchemaPiece::ResolveEnum {
489                    doc: doc.clone(),
490                    symbols,
491                    default: default_idx.map(|i| (i, r_symbols[i].clone())),
492                }
493            }
494            (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
495                if *wsz == *rsz {
496                    SchemaPiece::Fixed { size: *wsz }
497                } else {
498                    return Err(SchemaResolutionError::new(format!(
499                        "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
500                        &rs.name,
501                        wsz,
502                        rsz,
503                        self.get_current_human_readable_path(),
504                    ))
505                    .into());
506                }
507            }
508            (
509                SchemaPiece::Decimal {
510                    precision: wp,
511                    scale: wscale,
512                    fixed_size: wsz,
513                },
514                SchemaPiece::Decimal {
515                    precision: rp,
516                    scale: rscale,
517                    fixed_size: rsz,
518                },
519            ) => {
520                if wp != rp {
521                    return Err(SchemaResolutionError::new(format!(
522                        "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
523                        &rs.name,
524                        wp,
525                        rp,
526                        self.get_current_human_readable_path(),
527                    ))
528                    .into());
529                }
530                if wscale != rscale {
531                    return Err(SchemaResolutionError::new(format!(
532                        "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
533                        &rs.name,
534                        wscale,
535                        rscale,
536                        self.get_current_human_readable_path(),
537                    ))
538                    .into());
539                }
540                if wsz != rsz {
541                    return Err(SchemaResolutionError::new(format!(
542                        "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
543                        &rs.name,
544                        wsz,
545                        rsz,
546                        self.get_current_human_readable_path(),
547                    ))
548                    .into());
549                }
550                SchemaPiece::Decimal {
551                    precision: *wp,
552                    scale: *wscale,
553                    fixed_size: *wsz,
554                }
555            }
556            (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
557                if *fixed_size == Some(*size) =>
558            {
559                SchemaPiece::Fixed { size: *size }
560            }
561            (
562                SchemaPiece::Fixed { size },
563                SchemaPiece::Decimal {
564                    precision,
565                    scale,
566                    fixed_size,
567                },
568            ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
569                precision: *precision,
570                scale: *scale,
571                fixed_size: *fixed_size,
572            },
573
574            (_, SchemaPiece::ResolveRecord { .. })
575            | (_, SchemaPiece::ResolveEnum { .. })
576            | (SchemaPiece::ResolveRecord { .. }, _)
577            | (SchemaPiece::ResolveEnum { .. }, _) => {
578                return Err(SchemaResolutionError::new(
579                    "Attempted to resolve an already resolved schema".to_string(),
580                )
581                .into());
582            }
583
584            (_wt, _rt) => {
585                return Err(SchemaResolutionError::new(format!(
586                    "Non-matching schemas: writer: {:?}, reader: {:?}",
587                    ws.name, rs.name
588                ))
589                .into());
590            }
591        };
592        Ok(typ)
593    }
594
595    pub fn resolve(
596        &mut self,
597        writer: SchemaNodeOrNamed,
598        reader: SchemaNodeOrNamed,
599    ) -> Result<SchemaPieceOrNamed, AvroError> {
600        let previous_human_readable_path_start = self.current_human_readable_path_start;
601        let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
602        if let Some(full_name) = named_node {
603            self.current_human_readable_path_start = self.human_readable_field_path.len();
604            self.human_readable_field_path.push(full_name.human_name());
605        }
606
607        let inner = match (writer.inner, reader.inner) {
608            // Both schemas are unions - the most complicated case, but simpler than it looks.
609            // For each variant in the writer, we attempt to find a matching variant in the reader,
610            // either by type (for anonymous nodes) or by name (for named nodes).
611            //
612            // Having found a match, we resolve the writer variant against the reader variant,
613            // and record it in the resolved node.
614            //
615            // If either no match is found, or resolution on the matches fails, it is not an error
616            // -- it simply means that the corresponding entry in `permutation` will be `None`,
617            // and reading will fail if that variant is expressed. But
618            // reading variants that *do* match and resolve will still be possible.
619            //
620            // See the doc comment on `SchemaPiece::ResolveUnionUnion` for an explanation of the format of `permutation`.
621            (
622                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
623                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
624            ) => {
625                let w2r = self.writer_to_reader_names.clone();
626                // permutation[1] is Some((j, val)) iff the i'th writer variant
627                // _matches_ the j'th reader variant
628                // (i.e., it is the same primitive type, or the same kind of named type and has the same name, or a decimal with the same parameters)
629                // and successfully _resolves_ against it,
630                // and None otherwise.
631                //
632                // An example of types that match but don't resolve would be two records with the same name but incompatible fields.
633                let permutation = w_inner
634                    .variants()
635                    .iter()
636                    .map(|w_variant| {
637                        let (r_idx, r_variant) =
638                            r_inner.match_(w_variant, &w2r).ok_or_else(|| {
639                                SchemaResolutionError::new(format!(
640                                    "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
641                                    w_variant.get_human_name(writer.root),
642                                    self.get_current_human_readable_path()
643                                ))
644                            })?;
645                        let resolved =
646                            self.resolve(writer.step(w_variant), reader.step(r_variant))?;
647                        Ok((r_idx, resolved))
648                    })
649                    .collect();
650                let n_reader_variants = r_inner.variants().len();
651                let reader_null_variant = r_inner
652                    .variants()
653                    .iter()
654                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
655                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
656                    permutation,
657                    n_reader_variants,
658                    reader_null_variant,
659                })
660            }
661            // Writer is concrete; reader is union
662            (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
663                let n_reader_variants = r_inner.variants().len();
664                let reader_null_variant = r_inner
665                    .variants()
666                    .iter()
667                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
668                let (index, r_inner) = r_inner
669                    .match_ref(other, &self.writer_to_reader_names)
670                    .ok_or_else(|| {
671                        SchemaResolutionError::new(
672                            format!("No matching schema in reader union for writer type `{}` for field `{}`",
673                                    other.get_human_name(writer.root),
674                                    self.get_current_human_readable_path()))
675                    })?;
676                let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
677                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
678                    index,
679                    inner,
680                    n_reader_variants,
681                    reader_null_variant,
682                })
683            }
684            // Writer is union; reader is concrete
685            (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
686                let (index, w_inner) = w_inner
687                    .match_ref(other, &self.reader_to_writer_names)
688                    .ok_or_else(|| {
689                        SchemaResolutionError::new(
690                            format!("No matching schema in writer union for reader type `{}` for field `{}`",
691                                    other.get_human_name(writer.root),
692                                    self.get_current_human_readable_path()))
693                    })?;
694                let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
695                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
696            }
697            // Any other anonymous type.
698            (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
699                match (wp, rp) {
700                    // Normally for types that are underlyingly "long", we just interpret them according to the reader schema.
701                    // In this special case, it is better to interpret them according to the _writer_ schema:
702                    // By treating the written value as millis, we will decode the same DateTime values as were written.
703                    //
704                    // For example: if a writer wrote milliseconds and a reader tries to read it as microseconds,
705                    // it will be off by a factor of 1000 from the timestamp that the writer was intending to write
706                    (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
707                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
708                    }
709                    // See above
710                    (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
711                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
712                    }
713                    (SchemaPiece::Date, SchemaPiece::TimestampMilli)
714                    | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
715                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
716                    }
717                    (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
718                        SchemaPieceOrNamed::Piece(rp.clone()) // This clone is just a copy - none of the underlying int/long types own heap memory.
719                    }
720                    (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
721                        SchemaPieceOrNamed::Piece(rp.clone()) // see above comment
722                    }
723                    (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
724                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
725                    }
726                    (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
727                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
728                    }
729                    (SchemaPiece::Null, SchemaPiece::Null) => {
730                        SchemaPieceOrNamed::Piece(SchemaPiece::Null)
731                    }
732                    (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
733                        SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
734                    }
735                    (SchemaPiece::Int, SchemaPiece::Long) => {
736                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
737                    }
738                    (SchemaPiece::Int, SchemaPiece::Float) => {
739                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
740                    }
741                    (SchemaPiece::Int, SchemaPiece::Double) => {
742                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
743                    }
744                    (SchemaPiece::Long, SchemaPiece::Float) => {
745                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
746                    }
747                    (SchemaPiece::Long, SchemaPiece::Double) => {
748                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
749                    }
750                    (SchemaPiece::Float, SchemaPiece::Float) => {
751                        SchemaPieceOrNamed::Piece(SchemaPiece::Float)
752                    }
753                    (SchemaPiece::Float, SchemaPiece::Double) => {
754                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
755                    }
756                    (SchemaPiece::Double, SchemaPiece::Double) => {
757                        SchemaPieceOrNamed::Piece(SchemaPiece::Double)
758                    }
759                    (b, SchemaPiece::Bytes)
760                        if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
761                    {
762                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
763                    }
764                    (s, SchemaPiece::String)
765                        if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
766                    {
767                        SchemaPieceOrNamed::Piece(SchemaPiece::String)
768                    }
769                    (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
770                        let inner =
771                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
772                        SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
773                    }
774                    (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
775                        let inner =
776                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
777                        SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
778                    }
779                    (
780                        SchemaPiece::Decimal {
781                            precision: wp,
782                            scale: ws,
783                            fixed_size: wf,
784                        },
785                        SchemaPiece::Decimal {
786                            precision: rp,
787                            scale: rs,
788                            fixed_size: rf,
789                        },
790                    ) => {
791                        if wp == rp && ws == rs && wf == rf {
792                            SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
793                                precision: *wp,
794                                scale: *ws,
795                                fixed_size: *wf,
796                            })
797                        } else {
798                            return Err(SchemaResolutionError::new(format!(
799                                "Decimal types must match in precision, scale, and fixed size. \
800                                Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
801                                wp,
802                                ws,
803                                wf,
804                                rp,
805                                rs,
806                                rf,
807                                self.get_current_human_readable_path(),
808                            ))
809                            .into());
810                        }
811                    }
812                    (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
813                        if *fixed_size == None =>
814                    {
815                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
816                    }
817                    // TODO [btv] We probably want to rethink what we're doing here, rather than just add
818                    // a new branch for every possible "logical" type. Perhaps logical types with the
819                    // same underlying type should always be resolvable to the reader schema's type?
820                    (SchemaPiece::Json, SchemaPiece::Json) => {
821                        SchemaPieceOrNamed::Piece(SchemaPiece::Json)
822                    }
823                    (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
824                        SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
825                    }
826                    (
827                        SchemaPiece::Bytes,
828                        SchemaPiece::Decimal {
829                            precision,
830                            scale,
831                            fixed_size,
832                        },
833                    ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
834                        precision: *precision,
835                        scale: *scale,
836                        fixed_size: *fixed_size,
837                    }),
838                    (ws, rs) => {
839                        return Err(SchemaResolutionError::new(format!(
840                            "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
841                            ws,
842                            rs,
843                            self.get_current_human_readable_path(),
844                        ))
845                        .into());
846                    }
847                }
848            }
849            // Named types
850            (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
851                if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
852                    // The nodes in the two schemas have different names. Resolution fails.
853                    let (w_name, r_name) = (
854                        &writer.root.lookup(w_index).name,
855                        &reader.root.lookup(r_index).name,
856                    );
857                    return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {:?} against reader schema node named {:?}", w_name, r_name)).into());
858                }
859                // Check if we have already resolved the name previously, and if so, return a reference to
860                // it (in the new schema's namespace).
861                let idx = match self.reader_to_resolved_names.get(&r_index) {
862                    Some(resolved) => *resolved,
863                    None => {
864                        // We have not resolved this name yet; do so, and record it in the set of named schemas.
865                        // We need to push a placeholder beforehand, because schemas can be recursive;
866                        // a schema nested under this one may reference it.
867                        // A plausible example: {"type": "record", "name": "linked_list", "fields": [{"name": "next", "type": ["null", "linked_list"]}]}
868                        // Thus, `self.reader_to_resolved_names` needs to be correct for this node's index *before* we traverse the nodes under it.
869                        let resolved_idx = self.named.len();
870                        self.reader_to_resolved_names.insert(r_index, resolved_idx);
871                        self.named.push(None);
872                        let piece =
873                            match self.resolve_named(writer.root, reader.root, w_index, r_index) {
874                                Ok(piece) => piece,
875                                Err(e) => {
876                                    // clean up the placeholder values that were added above.
877                                    self.named.pop();
878                                    self.reader_to_resolved_names.remove(&r_index);
879                                    return Err(e);
880                                }
881                            };
882                        let name = &self.reader_schema.named[r_index].name;
883                        let ns = NamedSchemaPiece {
884                            name: name.clone(),
885                            piece,
886                        };
887                        self.named[resolved_idx] = Some(ns);
888                        self.indices.insert(name.clone(), resolved_idx);
889
890                        resolved_idx
891                    }
892                };
893                SchemaPieceOrNamed::Named(idx)
894            }
895            (ws, rs) => {
896                return Err(SchemaResolutionError::new(format!(
897                    "Schemas don't match: {:?}, {:?} for field `{}`",
898                    ws.get_piece_and_name(writer.root).0,
899                    rs.get_piece_and_name(reader.root).0,
900                    self.get_current_human_readable_path(),
901                ))
902                .into());
903            }
904        };
905        if named_node.is_some() {
906            self.human_readable_field_path.pop();
907            self.current_human_readable_path_start = previous_human_readable_path_start;
908        }
909        Ok(inner)
910    }
911
912    fn get_current_human_readable_path(&self) -> String {
913        self.human_readable_field_path[self.current_human_readable_path_start..].join(".")
914    }
915}
916
917/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
918/// to read from.
919///
920/// In case a reader `Schema` is provided, schema resolution will also be performed.
921///
922/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
923/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
924/// you are doing, instead.
925pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
926    let value = decode(schema.top_node(), reader)?;
927    Ok(value)
928}
929
930#[cfg(test)]
931mod tests {
932    use std::io::Cursor;
933
934    use mz_ore::assert_err;
935
936    use crate::Reader;
937    use crate::types::{Record, ToAvro};
938
939    use super::*;
940
941    static SCHEMA: &str = r#"
942            {
943                "type": "record",
944                "name": "test",
945                "fields": [
946                    {"name": "a", "type": "long", "default": 42},
947                    {"name": "b", "type": "string"}
948                ]
949            }
950        "#;
951    static UNION_SCHEMA: &str = r#"
952            ["null", "long"]
953        "#;
954    static ENCODED: &[u8] = &[
955        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
956        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
957        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
958        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
959        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
960        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
961        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
962        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
963        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
964        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
965        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
966        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
967        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
968        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
969    ];
970
971    #[mz_ore::test]
972    fn test_from_avro_datum() {
973        let schema: Schema = SCHEMA.parse().unwrap();
974        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
975
976        let mut record = Record::new(schema.top_node()).unwrap();
977        record.put("a", 27i64);
978        record.put("b", "foo");
979        let expected = record.avro();
980
981        assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
982    }
983
984    #[mz_ore::test]
985    fn test_null_union() {
986        let schema: Schema = UNION_SCHEMA.parse().unwrap();
987        let mut encoded: &'static [u8] = &[2, 0];
988
989        assert_eq!(
990            from_avro_datum(&schema, &mut encoded).unwrap(),
991            Value::Union {
992                index: 1,
993                inner: Box::new(Value::Long(0)),
994                n_variants: 2,
995                null_variant: Some(0)
996            }
997        );
998    }
999
1000    #[mz_ore::test]
1001    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
1002    fn test_reader_stream() {
1003        let schema: Schema = SCHEMA.parse().unwrap();
1004        let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1005
1006        let mut record1 = Record::new(schema.top_node()).unwrap();
1007        record1.put("a", 27i64);
1008        record1.put("b", "foo");
1009
1010        let mut record2 = Record::new(schema.top_node()).unwrap();
1011        record2.put("a", 42i64);
1012        record2.put("b", "bar");
1013
1014        let expected = [record1.avro(), record2.avro()];
1015
1016        for (i, value) in reader.enumerate() {
1017            assert_eq!(value.unwrap(), expected[i]);
1018        }
1019    }
1020
1021    #[mz_ore::test]
1022    fn test_reader_invalid_header() {
1023        let schema: Schema = SCHEMA.parse().unwrap();
1024        let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1025        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1026    }
1027
1028    #[mz_ore::test]
1029    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
1030    fn test_reader_invalid_block() {
1031        let schema: Schema = SCHEMA.parse().unwrap();
1032        let invalid = ENCODED
1033            .iter()
1034            .rev()
1035            .skip(19)
1036            .copied()
1037            .collect::<Vec<u8>>()
1038            .into_iter()
1039            .rev()
1040            .collect::<Vec<u8>>();
1041        let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1042        for value in reader {
1043            assert_err!(value);
1044        }
1045    }
1046
1047    #[mz_ore::test]
1048    fn test_reader_empty_buffer() {
1049        let empty = Cursor::new(Vec::new());
1050        assert!(Reader::new(empty).is_err());
1051    }
1052
1053    #[mz_ore::test]
1054    fn test_reader_only_header() {
1055        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1056        let reader = Reader::new(&invalid[..]).unwrap();
1057        for value in reader {
1058            assert_err!(value);
1059        }
1060    }
1061
1062    #[mz_ore::test]
1063    fn test_resolution_nested_types_error() {
1064        let r = r#"
1065{
1066    "type": "record",
1067    "name": "com.materialize.foo",
1068    "fields": [
1069        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1070    ]
1071}
1072"#;
1073        let w = r#"
1074{
1075    "type": "record",
1076    "name": "com.materialize.foo",
1077    "fields": [
1078        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1079    ]
1080}
1081"#;
1082        let r: Schema = r.parse().unwrap();
1083        let w: Schema = w.parse().unwrap();
1084        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1085            resolve_schemas(&w, &r)
1086        {
1087            s
1088        } else {
1089            panic!("Expected schema resolution failure");
1090        };
1091        // The field name here must NOT contain `com.materialize.foo`,
1092        // because explicitly named types are all relative to a global
1093        // namespace (i.e., they don't nest).
1094        assert_eq!(
1095            &err_str,
1096            "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`"
1097        );
1098    }
1099
1100    #[mz_ore::test]
1101    fn test_extra_fields_without_default_error() {
1102        let r = r#"
1103{
1104    "type": "record",
1105    "name": "com.materialize.foo",
1106    "fields": [
1107        {"name": "f1", "type": "int"},
1108        {"name": "f2", "type": "int"}
1109    ]
1110}
1111"#;
1112        let w = r#"
1113{
1114    "type": "record",
1115    "name": "com.materialize.foo",
1116    "fields": [
1117        {"name": "f1", "type": "int"}
1118    ]
1119}
1120"#;
1121        let r: Schema = r.parse().unwrap();
1122        let w: Schema = w.parse().unwrap();
1123        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1124            resolve_schemas(&w, &r)
1125        {
1126            s
1127        } else {
1128            panic!("Expected schema resolution failure");
1129        };
1130        assert_eq!(
1131            &err_str,
1132            "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1133        );
1134    }
1135
1136    #[mz_ore::test]
1137    fn test_duplicate_field_error() {
1138        let r = r#"
1139{
1140    "type": "record",
1141    "name": "com.materialize.bar",
1142    "fields": [
1143        {"name": "f1", "type": "int"},
1144        {"name": "f1", "type": "int"}
1145    ]
1146}
1147"#;
1148        let w = r#"
1149{
1150    "type": "record",
1151    "name": "com.materialize.bar",
1152    "fields": [
1153        {"name": "f1", "type": "int"}
1154    ]
1155}
1156"#;
1157        let r: Schema = r.parse().unwrap();
1158        let w: Schema = w.parse().unwrap();
1159        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1160            resolve_schemas(&w, &r)
1161        {
1162            s
1163        } else {
1164            panic!("Expected schema resolution failure");
1165        };
1166        assert_eq!(
1167            &err_str,
1168            "Duplicate field `com.materialize.bar.f1` in schema"
1169        );
1170    }
1171
1172    #[mz_ore::test]
1173    fn test_decimal_field_mismatch_error() {
1174        let r = r#"
1175{
1176    "type": "record",
1177    "name": "com.materialize.foo",
1178    "fields": [
1179        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1180    ]
1181}
1182"#;
1183        let w = r#"
1184{
1185    "type": "record",
1186    "name": "com.materialize.foo",
1187    "fields": [
1188        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1189    ]
1190}
1191"#;
1192        let r: Schema = r.parse().unwrap();
1193        let w: Schema = w.parse().unwrap();
1194        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1195            resolve_schemas(&w, &r)
1196        {
1197            s
1198        } else {
1199            panic!("Expected schema resolution failure");
1200        };
1201        assert_eq!(
1202            &err_str,
1203            "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1204        );
1205    }
1206}