Skip to main content

mz_avro/
reader.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic handling reading from Avro format at user level.
25
26use std::collections::BTreeMap;
27use std::str::{FromStr, from_utf8};
28
29use aws_lc_rs::digest;
30use serde_json::from_slice;
31
32use crate::decode::{AvroRead, decode};
33use crate::error::{DecodeError, Error as AvroError};
34use crate::schema::{
35    FullName, NamedSchemaPiece, ParseSchemaError, RecordField, ResolvedDefaultValueField,
36    ResolvedRecordField, Schema, SchemaNodeOrNamed, SchemaPiece, SchemaPieceOrNamed,
37    SchemaPieceRefOrNamed, resolve_schemas,
38};
39use crate::types::Value;
40use crate::{Codec, SchemaResolutionError, util};
41
42#[derive(Debug, Clone)]
43pub(crate) struct Header {
44    writer_schema: Schema,
45    marker: [u8; 16],
46    codec: Codec,
47}
48
49impl Header {
50    pub fn from_reader<R: AvroRead>(reader: &mut R) -> Result<Header, AvroError> {
51        let meta_schema = Schema {
52            named: vec![],
53            indices: Default::default(),
54            top: SchemaPiece::Map(Box::new(SchemaPiece::Bytes.into())).into(),
55        };
56
57        let mut buf = [0u8; 4];
58        reader.read_exact(&mut buf)?;
59
60        if buf != [b'O', b'b', b'j', 1u8] {
61            return Err(AvroError::Decode(DecodeError::WrongHeaderMagic(buf)));
62        }
63
64        if let Value::Map(meta) = decode(meta_schema.top_node(), reader)? {
65            // TODO: surface original parse schema errors instead of coalescing them here
66            let json = meta
67                .get("avro.schema")
68                .ok_or(AvroError::Decode(DecodeError::MissingAvroDotSchema))
69                .and_then(|bytes| {
70                    if let Value::Bytes(ref bytes) = *bytes {
71                        from_slice(bytes.as_ref()).map_err(|e| {
72                            AvroError::ParseSchema(ParseSchemaError::new(format!(
73                                "unable to decode schema bytes: {}",
74                                e
75                            )))
76                        })
77                    } else {
78                        unreachable!()
79                    }
80                })?;
81            let writer_schema = Schema::parse(&json).map_err(|e| {
82                ParseSchemaError::new(format!("unable to parse json as avro schema: {}", e))
83            })?;
84
85            let codec = meta
86                .get("avro.codec")
87                .map(|val| match val {
88                    Value::Bytes(bytes) => from_utf8(bytes.as_ref())
89                        .map_err(|_e| AvroError::Decode(DecodeError::CodecUtf8Error))
90                        .and_then(|codec| {
91                            Codec::from_str(codec).map_err(|_| {
92                                AvroError::Decode(DecodeError::UnrecognizedCodec(codec.to_string()))
93                            })
94                        }),
95                    _ => unreachable!(),
96                })
97                .unwrap_or(Ok(Codec::Null))?;
98
99            let mut marker = [0u8; 16];
100            reader.read_exact(&mut marker)?;
101
102            Ok(Header {
103                writer_schema,
104                marker,
105                codec,
106            })
107        } else {
108            unreachable!()
109        }
110    }
111
112    pub fn into_parts(self) -> (Schema, [u8; 16], Codec) {
113        (self.writer_schema, self.marker, self.codec)
114    }
115}
116
117pub struct Reader<R> {
118    header: Header,
119    inner: R,
120    errored: bool,
121    resolved_schema: Option<Schema>,
122    messages_remaining: usize,
123    // Internal buffering to reduce allocation.
124    buf: Vec<u8>,
125    buf_idx: usize,
126}
127
128/// An iterator over the `Block`s of a `Reader`
129pub struct BlockIter<R> {
130    inner: Reader<R>,
131}
132
133/// A block of Avro objects from an OCF file
134#[derive(Debug, Clone)]
135pub struct Block {
136    /// The raw bytes for the block
137    pub bytes: Vec<u8>,
138    /// The number of Avro objects in the block
139    pub len: usize,
140}
141
142impl<R: AvroRead> BlockIter<R> {
143    pub fn with_schema(reader_schema: &Schema, inner: R) -> Result<Self, AvroError> {
144        Ok(Self {
145            inner: Reader::with_schema(reader_schema, inner)?,
146        })
147    }
148}
149
150impl<R: AvroRead> Iterator for BlockIter<R> {
151    type Item = Result<Block, AvroError>;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        assert!(self.inner.is_empty());
155
156        match self.inner.read_block_next() {
157            Ok(()) => {
158                if self.inner.is_empty() {
159                    None
160                } else {
161                    let bytes = std::mem::take(&mut self.inner.buf);
162                    let len = std::mem::take(&mut self.inner.messages_remaining);
163                    Some(Ok(Block { bytes, len }))
164                }
165            }
166            Err(e) => Some(Err(e)),
167        }
168    }
169}
170
171impl<R: AvroRead> Reader<R> {
172    /// Creates a `Reader` given something implementing the `tokio::io::AsyncRead` trait to read from.
173    /// No reader `Schema` will be set.
174    ///
175    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
176    pub fn new(mut inner: R) -> Result<Reader<R>, AvroError> {
177        let header = Header::from_reader(&mut inner)?;
178        let reader = Reader {
179            header,
180            inner,
181            errored: false,
182            resolved_schema: None,
183            messages_remaining: 0,
184            buf: vec![],
185            buf_idx: 0,
186        };
187        Ok(reader)
188    }
189
190    /// Creates a `Reader` given a reader `Schema` and something implementing the `tokio::io::AsyncRead` trait
191    /// to read from.
192    ///
193    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
194    pub fn with_schema(reader_schema: &Schema, mut inner: R) -> Result<Reader<R>, AvroError> {
195        let header = Header::from_reader(&mut inner)?;
196
197        let writer_schema = &header.writer_schema;
198        let resolved_schema = if reader_schema.fingerprint(&digest::SHA256).bytes
199            != writer_schema.fingerprint(&digest::SHA256).bytes
200        {
201            Some(resolve_schemas(writer_schema, reader_schema)?)
202        } else {
203            None
204        };
205
206        Ok(Reader {
207            header,
208            errored: false,
209            resolved_schema,
210            inner,
211            messages_remaining: 0,
212            buf: vec![],
213            buf_idx: 0,
214        })
215    }
216
217    /// Get a reference to the writer `Schema`.
218    pub fn writer_schema(&self) -> &Schema {
219        &self.header.writer_schema
220    }
221
222    /// Get a reference to the resolved schema
223    /// (or just the writer schema, if no reader schema was provided
224    ///  or the two schemas are identical)
225    pub fn schema(&self) -> &Schema {
226        match &self.resolved_schema {
227            Some(schema) => schema,
228            None => self.writer_schema(),
229        }
230    }
231
232    #[inline]
233    /// Read the next Avro value from the file, if one exists.
234    pub fn read_next(&mut self) -> Result<Option<Value>, AvroError> {
235        if self.is_empty() {
236            self.read_block_next()?;
237            if self.is_empty() {
238                return Ok(None);
239            }
240        }
241
242        let mut block_bytes = &self.buf[self.buf_idx..];
243        let b_original = block_bytes.len();
244        let schema = self.schema();
245        let item = from_avro_datum(schema, &mut block_bytes)?;
246        self.buf_idx += b_original - block_bytes.len();
247        self.messages_remaining -= 1;
248        Ok(Some(item))
249    }
250
251    fn is_empty(&self) -> bool {
252        self.messages_remaining == 0
253    }
254
255    fn fill_buf(&mut self, n: usize) -> Result<(), AvroError> {
256        // We don't have enough space in the buffer, need to grow it.
257        if n >= self.buf.len() {
258            self.buf.resize(n, 0);
259        }
260
261        self.inner.read_exact(&mut self.buf[..n])?;
262        self.buf_idx = 0;
263        Ok(())
264    }
265
266    fn read_block_next(&mut self) -> Result<(), AvroError> {
267        assert!(self.is_empty(), "Expected self to be empty!");
268        match util::read_long(&mut self.inner) {
269            Ok(block_len) => {
270                // The object count is read straight from the wire; cap it like
271                // every other wire-read length (and reject negatives, which wrap
272                // to a huge `usize`). Otherwise a crafted block with a huge count
273                // and a zero-byte schema (e.g. `null`) makes the reader spin
274                // decoding billions of empty values. Found by the reader_decode
275                // cargo-fuzz target.
276                self.messages_remaining = util::safe_len(block_len as usize)?;
277                let block_bytes = util::safe_len(util::read_long(&mut self.inner)? as usize)?;
278                self.fill_buf(block_bytes)?;
279                let mut marker = [0u8; 16];
280                self.inner.read_exact(&mut marker)?;
281
282                if marker != self.header.marker {
283                    return Err(DecodeError::MismatchedBlockHeader {
284                        expected: self.header.marker,
285                        actual: marker,
286                    }
287                    .into());
288                }
289
290                // NOTE (JAB): This doesn't fit this Reader pattern very well.
291                // `self.buf` is a growable buffer that is reused as the reader is iterated.
292                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
293                // and replace `buf` with the new one, instead of reusing the same buffer.
294                // We can address this by using some "limited read" type to decode directly
295                // into the buffer. But this is fine, for now.
296                self.header.codec.decompress(&mut self.buf)?;
297
298                Ok(())
299            }
300            Err(e) => {
301                if let AvroError::IO(std::io::ErrorKind::UnexpectedEof) = e {
302                    // to not return any error in case we only finished to read cleanly from the stream
303                    Ok(())
304                } else {
305                    Err(e)
306                }
307            }
308        }
309    }
310}
311
312impl<R: AvroRead> Iterator for Reader<R> {
313    type Item = Result<Value, AvroError>;
314
315    fn next(&mut self) -> Option<Self::Item> {
316        // to prevent continuing to read after the first error occurs
317        if self.errored {
318            return None;
319        };
320        match self.read_next() {
321            Ok(opt) => opt.map(Ok),
322            Err(e) => {
323                self.errored = true;
324                Some(Err(e))
325            }
326        }
327    }
328}
329
330pub struct SchemaResolver<'a> {
331    pub named: Vec<Option<NamedSchemaPiece>>,
332    pub indices: BTreeMap<FullName, usize>,
333    pub human_readable_field_path: Vec<String>,
334    pub current_human_readable_path_start: usize,
335    pub writer_to_reader_names: BTreeMap<usize, usize>,
336    pub reader_to_writer_names: BTreeMap<usize, usize>,
337    pub reader_to_resolved_names: BTreeMap<usize, usize>,
338    #[allow(dead_code)]
339    pub reader_fullnames: BTreeMap<usize, &'a FullName>,
340    pub reader_schema: &'a Schema,
341}
342
343impl<'a> SchemaResolver<'a> {
344    fn resolve_named(
345        &mut self,
346        writer: &Schema,
347        reader: &Schema,
348        writer_index: usize,
349        reader_index: usize,
350    ) -> Result<SchemaPiece, AvroError> {
351        let ws = writer.lookup(writer_index);
352        let rs = reader.lookup(reader_index);
353        let typ = match (&ws.piece, &rs.piece) {
354            (
355                SchemaPiece::Record {
356                    fields: w_fields,
357                    lookup: w_lookup,
358                    ..
359                },
360                SchemaPiece::Record {
361                    fields: r_fields,
362                    lookup: _r_lookup,
363                    ..
364                },
365            ) => {
366                let mut defaults = Vec::new();
367                let mut fields: Vec<Option<RecordField>> = Vec::new();
368                for (r_index, rf) in r_fields.iter().enumerate() {
369                    match w_lookup.get(&rf.name) {
370                        None => {
371                            let default_field = match &rf.default {
372                                Some(v) => ResolvedDefaultValueField {
373                                    name: rf.name.clone(),
374                                    doc: rf.doc.clone(),
375                                    default: reader
376                                        .top_node_or_named()
377                                        .step(&rf.schema)
378                                        .lookup()
379                                        .json_to_value(v)?,
380                                    order: rf.order.clone(),
381                                    position: r_index,
382                                },
383                                None => return Err(SchemaResolutionError::new(format!(
384                                    "Reader field `{}.{}` not found in writer, and has no default",
385                                    self.get_current_human_readable_path(),
386                                    rf.name
387                                ))
388                                .into()),
389                            };
390                            defaults.push(default_field);
391                        }
392                        Some(w_index) => {
393                            if fields.len() > *w_index && fields[*w_index].is_some() {
394                                return Err(SchemaResolutionError::new(format!(
395                                    "Duplicate field `{}.{}` in schema",
396                                    self.get_current_human_readable_path(),
397                                    rf.name
398                                ))
399                                .into());
400                            }
401                            let wf = &w_fields[*w_index];
402                            let w_node = SchemaNodeOrNamed {
403                                root: writer,
404                                inner: wf.schema.as_ref(),
405                            };
406                            let r_node = SchemaNodeOrNamed {
407                                root: reader,
408                                inner: rf.schema.as_ref(),
409                            };
410
411                            self.human_readable_field_path.push(rf.name.clone());
412                            let new_inner = self.resolve(w_node, r_node)?;
413                            self.human_readable_field_path.pop();
414
415                            let field = RecordField {
416                                name: rf.name.clone(),
417                                doc: rf.doc.clone(),
418                                default: rf.default.clone(),
419                                schema: new_inner,
420                                order: rf.order.clone(),
421                                position: r_index,
422                            };
423                            while fields.len() <= *w_index {
424                                fields.push(None);
425                            }
426                            fields[*w_index] = Some(field)
427                        }
428                    }
429                }
430                while fields.len() < w_fields.len() {
431                    fields.push(None);
432                }
433                let mut n_present = 0;
434                let fields = fields
435                    .into_iter()
436                    .enumerate()
437                    .map(|(i, rf)| match rf {
438                        Some(rf) => {
439                            n_present += 1;
440                            ResolvedRecordField::Present(rf)
441                        }
442                        None => {
443                            // Clone the chunk of the writer schema appearing here.
444                            // We could probably be clever and avoid some cloning,
445                            // but absolute highest performance probably isn't important for schema resolution.
446                            //
447                            // The cloned writer schema piece is needed to guide decoding of the value,
448                            // since even though it doesn't appear in the reader schema it needs
449                            // to be decoded to know where it ends.
450                            //
451                            // TODO -- We could try to come up with a "Dummy" schema variant
452                            // that does only enough decoding to find the end of a value,
453                            // and maybe save some time.
454                            let writer_schema_piece = SchemaNodeOrNamed {
455                                root: writer,
456                                inner: w_fields[i].schema.as_ref(),
457                            }
458                            .to_schema();
459                            ResolvedRecordField::Absent(writer_schema_piece)
460                        }
461                    })
462                    .collect();
463                let n_reader_fields = defaults.len() + n_present;
464                SchemaPiece::ResolveRecord {
465                    defaults,
466                    fields,
467                    n_reader_fields,
468                }
469            }
470            (
471                SchemaPiece::Enum {
472                    symbols: w_symbols, ..
473                },
474                SchemaPiece::Enum {
475                    symbols: r_symbols,
476                    doc,
477                    default_idx,
478                },
479            ) => {
480                let r_map = r_symbols
481                    .iter()
482                    .enumerate()
483                    .map(|(i, s)| (s, i))
484                    .collect::<BTreeMap<_, _>>();
485                let symbols = w_symbols
486                    .iter()
487                    .map(|s| {
488                        r_map
489                            .get(s)
490                            .map(|i| (*i, s.clone()))
491                            .ok_or_else(|| s.clone())
492                    })
493                    .collect();
494                SchemaPiece::ResolveEnum {
495                    doc: doc.clone(),
496                    symbols,
497                    default: default_idx.map(|i| (i, r_symbols[i].clone())),
498                }
499            }
500            (SchemaPiece::Fixed { size: wsz }, SchemaPiece::Fixed { size: rsz }) => {
501                if *wsz == *rsz {
502                    SchemaPiece::Fixed { size: *wsz }
503                } else {
504                    return Err(SchemaResolutionError::new(format!(
505                        "Fixed schema {:?}: sizes don't match ({}, {}) for field `{}`",
506                        &rs.name,
507                        wsz,
508                        rsz,
509                        self.get_current_human_readable_path(),
510                    ))
511                    .into());
512                }
513            }
514            (
515                SchemaPiece::Decimal {
516                    precision: wp,
517                    scale: wscale,
518                    fixed_size: wsz,
519                },
520                SchemaPiece::Decimal {
521                    precision: rp,
522                    scale: rscale,
523                    fixed_size: rsz,
524                },
525            ) => {
526                if wp != rp {
527                    return Err(SchemaResolutionError::new(format!(
528                        "Decimal schema {:?}: precisions don't match: {}, {} for field `{}`",
529                        &rs.name,
530                        wp,
531                        rp,
532                        self.get_current_human_readable_path(),
533                    ))
534                    .into());
535                }
536                if wscale != rscale {
537                    return Err(SchemaResolutionError::new(format!(
538                        "Decimal schema {:?}: sizes don't match: {}, {} for field `{}`",
539                        &rs.name,
540                        wscale,
541                        rscale,
542                        self.get_current_human_readable_path(),
543                    ))
544                    .into());
545                }
546                if wsz != rsz {
547                    return Err(SchemaResolutionError::new(format!(
548                        "Decimal schema {:?}: sizes don't match: {:?}, {:?} for field `{}`",
549                        &rs.name,
550                        wsz,
551                        rsz,
552                        self.get_current_human_readable_path(),
553                    ))
554                    .into());
555                }
556                SchemaPiece::Decimal {
557                    precision: *wp,
558                    scale: *wscale,
559                    fixed_size: *wsz,
560                }
561            }
562            (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Fixed { size })
563                if *fixed_size == Some(*size) =>
564            {
565                SchemaPiece::Fixed { size: *size }
566            }
567            (
568                SchemaPiece::Fixed { size },
569                SchemaPiece::Decimal {
570                    precision,
571                    scale,
572                    fixed_size,
573                },
574            ) if *fixed_size == Some(*size) => SchemaPiece::Decimal {
575                precision: *precision,
576                scale: *scale,
577                fixed_size: *fixed_size,
578            },
579
580            (_, SchemaPiece::ResolveRecord { .. })
581            | (_, SchemaPiece::ResolveEnum { .. })
582            | (SchemaPiece::ResolveRecord { .. }, _)
583            | (SchemaPiece::ResolveEnum { .. }, _) => {
584                return Err(SchemaResolutionError::new(
585                    "Attempted to resolve an already resolved schema".to_string(),
586                )
587                .into());
588            }
589
590            (_wt, _rt) => {
591                return Err(SchemaResolutionError::new(format!(
592                    "Non-matching schemas: writer: {:?}, reader: {:?}",
593                    ws.name, rs.name
594                ))
595                .into());
596            }
597        };
598        Ok(typ)
599    }
600
601    pub fn resolve(
602        &mut self,
603        writer: SchemaNodeOrNamed,
604        reader: SchemaNodeOrNamed,
605    ) -> Result<SchemaPieceOrNamed, AvroError> {
606        let previous_human_readable_path_start = self.current_human_readable_path_start;
607        let (_, named_node) = reader.inner.get_piece_and_name(reader.root);
608        if let Some(full_name) = named_node {
609            self.current_human_readable_path_start = self.human_readable_field_path.len();
610            self.human_readable_field_path.push(full_name.human_name());
611        }
612
613        let inner = match (writer.inner, reader.inner) {
614            // Both schemas are unions - the most complicated case, but simpler than it looks.
615            // For each variant in the writer, we attempt to find a matching variant in the reader,
616            // either by type (for anonymous nodes) or by name (for named nodes).
617            //
618            // Having found a match, we resolve the writer variant against the reader variant,
619            // and record it in the resolved node.
620            //
621            // If either no match is found, or resolution on the matches fails, it is not an error
622            // -- it simply means that the corresponding entry in `permutation` will be `None`,
623            // and reading will fail if that variant is expressed. But
624            // reading variants that *do* match and resolve will still be possible.
625            //
626            // See the doc comment on `SchemaPiece::ResolveUnionUnion` for an explanation of the format of `permutation`.
627            (
628                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)),
629                SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner)),
630            ) => {
631                let w2r = self.writer_to_reader_names.clone();
632                // permutation[1] is Some((j, val)) iff the i'th writer variant
633                // _matches_ the j'th reader variant
634                // (i.e., it is the same primitive type, or the same kind of named type and has the same name, or a decimal with the same parameters)
635                // and successfully _resolves_ against it,
636                // and None otherwise.
637                //
638                // An example of types that match but don't resolve would be two records with the same name but incompatible fields.
639                let permutation = w_inner
640                    .variants()
641                    .iter()
642                    .map(|w_variant| {
643                        let (r_idx, r_variant) =
644                            r_inner.match_promote_writer(w_variant, &w2r).ok_or_else(|| {
645                                SchemaResolutionError::new(format!(
646                                    "Failed to match writer union variant `{}` against any variant in the reader for field `{}`",
647                                    w_variant.get_human_name(writer.root),
648                                    self.get_current_human_readable_path()
649                                ))
650                            })?;
651                        let resolved =
652                            self.resolve(writer.step(w_variant), reader.step(r_variant))?;
653                        Ok((r_idx, resolved))
654                    })
655                    .collect();
656                let n_reader_variants = r_inner.variants().len();
657                let reader_null_variant = r_inner
658                    .variants()
659                    .iter()
660                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
661                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionUnion {
662                    permutation,
663                    n_reader_variants,
664                    reader_null_variant,
665                })
666            }
667            // Writer is concrete; reader is union
668            (other, SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(r_inner))) => {
669                let n_reader_variants = r_inner.variants().len();
670                let reader_null_variant = r_inner
671                    .variants()
672                    .iter()
673                    .position(|v| v == &SchemaPieceOrNamed::Piece(SchemaPiece::Null));
674                let (index, r_inner) = r_inner
675                    .match_ref_promote_writer(other, &self.writer_to_reader_names)
676                    .ok_or_else(|| {
677                        SchemaResolutionError::new(
678                            format!("No matching schema in reader union for writer type `{}` for field `{}`",
679                                    other.get_human_name(writer.root),
680                                    self.get_current_human_readable_path()))
681                    })?;
682                let inner = Box::new(self.resolve(writer.step_ref(other), reader.step(r_inner))?);
683                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveConcreteUnion {
684                    index,
685                    inner,
686                    n_reader_variants,
687                    reader_null_variant,
688                })
689            }
690            // Writer is union; reader is concrete
691            (SchemaPieceRefOrNamed::Piece(SchemaPiece::Union(w_inner)), other) => {
692                let (index, w_inner) = w_inner
693                    .match_ref_promote_reader(other, &self.reader_to_writer_names)
694                    .ok_or_else(|| {
695                        // `other` is the reader's concrete node (the second element
696                        // of the match), so its name must be looked up in the
697                        // reader's schema. Using `writer.root` here indexed the
698                        // writer's (possibly empty) `named` table out of bounds.
699                        SchemaResolutionError::new(
700                            format!("No matching schema in writer union for reader type `{}` for field `{}`",
701                                    other.get_human_name(reader.root),
702                                    self.get_current_human_readable_path()))
703                    })?;
704                let inner = Box::new(self.resolve(writer.step(w_inner), reader.step_ref(other))?);
705                SchemaPieceOrNamed::Piece(SchemaPiece::ResolveUnionConcrete { index, inner })
706            }
707            // Any other anonymous type.
708            (SchemaPieceRefOrNamed::Piece(wp), SchemaPieceRefOrNamed::Piece(rp)) => {
709                match (wp, rp) {
710                    // Normally for types that are underlyingly "long", we just interpret them according to the reader schema.
711                    // In this special case, it is better to interpret them according to the _writer_ schema:
712                    // By treating the written value as millis, we will decode the same DateTime values as were written.
713                    //
714                    // For example: if a writer wrote milliseconds and a reader tries to read it as microseconds,
715                    // it will be off by a factor of 1000 from the timestamp that the writer was intending to write
716                    (SchemaPiece::TimestampMilli, SchemaPiece::TimestampMicro) => {
717                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMilli)
718                    }
719                    // See above
720                    (SchemaPiece::TimestampMicro, SchemaPiece::TimestampMilli) => {
721                        SchemaPieceOrNamed::Piece(SchemaPiece::TimestampMicro)
722                    }
723                    (SchemaPiece::Date, SchemaPiece::TimestampMilli)
724                    | (SchemaPiece::Date, SchemaPiece::TimestampMicro) => {
725                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveDateTimestamp)
726                    }
727                    (wp, rp) if wp.is_underlying_int() && rp.is_underlying_int() => {
728                        SchemaPieceOrNamed::Piece(rp.clone()) // This clone is just a copy - none of the underlying int/long types own heap memory.
729                    }
730                    (wp, rp) if wp.is_underlying_long() && rp.is_underlying_long() => {
731                        SchemaPieceOrNamed::Piece(rp.clone()) // see above comment
732                    }
733                    (wp, SchemaPiece::TimestampMilli) if wp.is_underlying_int() => {
734                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMilli)
735                    }
736                    (wp, SchemaPiece::TimestampMicro) if wp.is_underlying_int() => {
737                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntTsMicro)
738                    }
739                    (SchemaPiece::Null, SchemaPiece::Null) => {
740                        SchemaPieceOrNamed::Piece(SchemaPiece::Null)
741                    }
742                    (SchemaPiece::Boolean, SchemaPiece::Boolean) => {
743                        SchemaPieceOrNamed::Piece(SchemaPiece::Boolean)
744                    }
745                    (SchemaPiece::Int, SchemaPiece::Long) => {
746                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntLong)
747                    }
748                    (SchemaPiece::Int, SchemaPiece::Float) => {
749                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntFloat)
750                    }
751                    (SchemaPiece::Int, SchemaPiece::Double) => {
752                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveIntDouble)
753                    }
754                    (SchemaPiece::Long, SchemaPiece::Float) => {
755                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongFloat)
756                    }
757                    (SchemaPiece::Long, SchemaPiece::Double) => {
758                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveLongDouble)
759                    }
760                    (SchemaPiece::Float, SchemaPiece::Float) => {
761                        SchemaPieceOrNamed::Piece(SchemaPiece::Float)
762                    }
763                    (SchemaPiece::Float, SchemaPiece::Double) => {
764                        SchemaPieceOrNamed::Piece(SchemaPiece::ResolveFloatDouble)
765                    }
766                    (SchemaPiece::Double, SchemaPiece::Double) => {
767                        SchemaPieceOrNamed::Piece(SchemaPiece::Double)
768                    }
769                    (b, SchemaPiece::Bytes)
770                        if b == &SchemaPiece::Bytes || b == &SchemaPiece::String =>
771                    {
772                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
773                    }
774                    (s, SchemaPiece::String)
775                        if s == &SchemaPiece::String || s == &SchemaPiece::Bytes =>
776                    {
777                        SchemaPieceOrNamed::Piece(SchemaPiece::String)
778                    }
779                    (SchemaPiece::Array(w_inner), SchemaPiece::Array(r_inner)) => {
780                        let inner =
781                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
782                        SchemaPieceOrNamed::Piece(SchemaPiece::Array(Box::new(inner)))
783                    }
784                    (SchemaPiece::Map(w_inner), SchemaPiece::Map(r_inner)) => {
785                        let inner =
786                            self.resolve(writer.step(&**w_inner), reader.step(&**r_inner))?;
787                        SchemaPieceOrNamed::Piece(SchemaPiece::Map(Box::new(inner)))
788                    }
789                    (
790                        SchemaPiece::Decimal {
791                            precision: wp,
792                            scale: ws,
793                            fixed_size: wf,
794                        },
795                        SchemaPiece::Decimal {
796                            precision: rp,
797                            scale: rs,
798                            fixed_size: rf,
799                        },
800                    ) => {
801                        if wp == rp && ws == rs && wf == rf {
802                            SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
803                                precision: *wp,
804                                scale: *ws,
805                                fixed_size: *wf,
806                            })
807                        } else {
808                            return Err(SchemaResolutionError::new(format!(
809                                "Decimal types must match in precision, scale, and fixed size. \
810                                Got ({:?}, {:?}, {:?}); ({:?}, {:?}. {:?}) for field `{}`",
811                                wp,
812                                ws,
813                                wf,
814                                rp,
815                                rs,
816                                rf,
817                                self.get_current_human_readable_path(),
818                            ))
819                            .into());
820                        }
821                    }
822                    (SchemaPiece::Decimal { fixed_size, .. }, SchemaPiece::Bytes)
823                        if *fixed_size == None =>
824                    {
825                        SchemaPieceOrNamed::Piece(SchemaPiece::Bytes)
826                    }
827                    // TODO [btv] We probably want to rethink what we're doing here, rather than just add
828                    // a new branch for every possible "logical" type. Perhaps logical types with the
829                    // same underlying type should always be resolvable to the reader schema's type?
830                    (SchemaPiece::Json, SchemaPiece::Json) => {
831                        SchemaPieceOrNamed::Piece(SchemaPiece::Json)
832                    }
833                    (SchemaPiece::Uuid, SchemaPiece::Uuid) => {
834                        SchemaPieceOrNamed::Piece(SchemaPiece::Uuid)
835                    }
836                    (
837                        SchemaPiece::Bytes,
838                        SchemaPiece::Decimal {
839                            precision,
840                            scale,
841                            fixed_size,
842                        },
843                    ) if *fixed_size == None => SchemaPieceOrNamed::Piece(SchemaPiece::Decimal {
844                        precision: *precision,
845                        scale: *scale,
846                        fixed_size: *fixed_size,
847                    }),
848                    (ws, rs) => {
849                        return Err(SchemaResolutionError::new(format!(
850                            "Writer schema has type `{:?}`, but reader schema has type `{:?}` for field `{}`",
851                            ws,
852                            rs,
853                            self.get_current_human_readable_path(),
854                        ))
855                        .into());
856                    }
857                }
858            }
859            // Named types
860            (SchemaPieceRefOrNamed::Named(w_index), SchemaPieceRefOrNamed::Named(r_index)) => {
861                if self.writer_to_reader_names.get(&w_index) != Some(&r_index) {
862                    // The nodes in the two schemas have different names. Resolution fails.
863                    let (w_name, r_name) = (
864                        &writer.root.lookup(w_index).name,
865                        &reader.root.lookup(r_index).name,
866                    );
867                    return Err(SchemaResolutionError::new(format!("Attempted to resolve writer schema node named {:?} against reader schema node named {:?}", w_name, r_name)).into());
868                }
869                // Check if we have already resolved the name previously, and if so, return a reference to
870                // it (in the new schema's namespace).
871                let idx = match self.reader_to_resolved_names.get(&r_index) {
872                    Some(resolved) => *resolved,
873                    None => {
874                        // We have not resolved this name yet; do so, and record it in the set of named schemas.
875                        // We need to push a placeholder beforehand, because schemas can be recursive;
876                        // a schema nested under this one may reference it.
877                        // A plausible example: {"type": "record", "name": "linked_list", "fields": [{"name": "next", "type": ["null", "linked_list"]}]}
878                        // Thus, `self.reader_to_resolved_names` needs to be correct for this node's index *before* we traverse the nodes under it.
879                        let resolved_idx = self.named.len();
880                        self.reader_to_resolved_names.insert(r_index, resolved_idx);
881                        self.named.push(None);
882                        let piece =
883                            match self.resolve_named(writer.root, reader.root, w_index, r_index) {
884                                Ok(piece) => piece,
885                                Err(e) => {
886                                    // Roll back to the state before this node. We must remove not
887                                    // only this node's placeholder but also any nested named nodes
888                                    // resolved while resolving it: they live at indices
889                                    // `>= resolved_idx` and are unreachable now that this resolution
890                                    // failed. A plain `pop()` removed only the last one, orphaning a
891                                    // `None` placeholder (which a later `Option::unwrap` panics on)
892                                    // whenever a nested node had been pushed. Union resolution stores
893                                    // rather than propagates this error, so the orphan would survive.
894                                    self.named.truncate(resolved_idx);
895                                    self.reader_to_resolved_names
896                                        .retain(|_, v| *v < resolved_idx);
897                                    self.indices.retain(|_, v| *v < resolved_idx);
898                                    return Err(e);
899                                }
900                            };
901                        let name = &self.reader_schema.named[r_index].name;
902                        let ns = NamedSchemaPiece {
903                            name: name.clone(),
904                            piece,
905                        };
906                        self.named[resolved_idx] = Some(ns);
907                        self.indices.insert(name.clone(), resolved_idx);
908
909                        resolved_idx
910                    }
911                };
912                SchemaPieceOrNamed::Named(idx)
913            }
914            (ws, rs) => {
915                return Err(SchemaResolutionError::new(format!(
916                    "Schemas don't match: {:?}, {:?} for field `{}`",
917                    ws.get_piece_and_name(writer.root).0,
918                    rs.get_piece_and_name(reader.root).0,
919                    self.get_current_human_readable_path(),
920                ))
921                .into());
922            }
923        };
924        if named_node.is_some() {
925            self.human_readable_field_path.pop();
926            self.current_human_readable_path_start = previous_human_readable_path_start;
927        }
928        Ok(inner)
929    }
930
931    fn get_current_human_readable_path(&self) -> String {
932        self.human_readable_field_path[self.current_human_readable_path_start..].join(".")
933    }
934}
935
936/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
937/// to read from.
938///
939/// In case a reader `Schema` is provided, schema resolution will also be performed.
940///
941/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
942/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
943/// you are doing, instead.
944pub fn from_avro_datum<R: AvroRead>(schema: &Schema, reader: &mut R) -> Result<Value, AvroError> {
945    let value = decode(schema.top_node(), reader)?;
946    Ok(value)
947}
948
949#[cfg(test)]
950mod tests {
951    use std::io::Cursor;
952
953    use mz_ore::assert_err;
954
955    use crate::Reader;
956    use crate::types::{Record, ToAvro};
957
958    use super::*;
959
960    #[mz_ore::test]
961    fn reader_rejects_huge_block_object_count() {
962        // A crafted object-container block with a huge object count and a
963        // zero-byte (`null`) schema must be rejected, not spin decoding billions
964        // of empty values. Regression for the reader_decode cargo-fuzz timeout.
965        let bytes: &[u8] = &[
966            0x4f, 0x62, 0x6a, 0x01, 0x04, 0x16, 0x61, 0x76, 0x72, 0x6f, 0x2e, 0x73, 0x63, 0x68,
967            0x65, 0x6d, 0x61, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x22,
968            0x6e, 0x75, 0x6c, 0x6c, 0x22, 0x20, 0x00, 0x00, 0x00, 0x64, 0x64, 0x64, 0x64, 0x64,
969            0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0xbf, 0x00, 0x26,
970            0x35, 0x33, 0x39, 0x33, 0x34, 0x38, 0x33, 0xcd, 0x45, 0x38, 0x56, 0xb1, 0x00, 0x00,
971            0x64, 0x64, 0x7a, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
972            0x64, 0x64, 0x64, 0x64, 0x64, 0x64, 0x64,
973        ];
974        let reader = Reader::new(bytes).expect("OCF header parses");
975        // Iteration must terminate with an error (the oversized count is
976        // rejected), not hang.
977        assert!(
978            reader.into_iter().any(|item| item.is_err()),
979            "expected a decode error from the oversized block count"
980        );
981    }
982
983    static SCHEMA: &str = r#"
984            {
985                "type": "record",
986                "name": "test",
987                "fields": [
988                    {"name": "a", "type": "long", "default": 42},
989                    {"name": "b", "type": "string"}
990                ]
991            }
992        "#;
993    static UNION_SCHEMA: &str = r#"
994            ["null", "long"]
995        "#;
996    static ENCODED: &[u8] = &[
997        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
998        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
999        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
1000        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
1001        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
1002        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
1003        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
1004        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
1005        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
1006        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
1007        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
1008        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
1009        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
1010        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
1011    ];
1012
1013    #[mz_ore::test]
1014    fn test_from_avro_datum() {
1015        let schema: Schema = SCHEMA.parse().unwrap();
1016        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
1017
1018        let mut record = Record::new(schema.top_node()).unwrap();
1019        record.put("a", 27i64);
1020        record.put("b", "foo");
1021        let expected = record.avro();
1022
1023        assert_eq!(from_avro_datum(&schema, &mut encoded).unwrap(), expected);
1024    }
1025
1026    #[mz_ore::test]
1027    fn test_null_union() {
1028        let schema: Schema = UNION_SCHEMA.parse().unwrap();
1029        let mut encoded: &'static [u8] = &[2, 0];
1030
1031        assert_eq!(
1032            from_avro_datum(&schema, &mut encoded).unwrap(),
1033            Value::Union {
1034                index: 1,
1035                inner: Box::new(Value::Long(0)),
1036                n_variants: 2,
1037                null_variant: Some(0)
1038            }
1039        );
1040    }
1041
1042    #[mz_ore::test]
1043    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
1044    fn test_reader_stream() {
1045        let schema: Schema = SCHEMA.parse().unwrap();
1046        let reader = Reader::with_schema(&schema, ENCODED).unwrap();
1047
1048        let mut record1 = Record::new(schema.top_node()).unwrap();
1049        record1.put("a", 27i64);
1050        record1.put("b", "foo");
1051
1052        let mut record2 = Record::new(schema.top_node()).unwrap();
1053        record2.put("a", 42i64);
1054        record2.put("b", "bar");
1055
1056        let expected = [record1.avro(), record2.avro()];
1057
1058        for (i, value) in reader.enumerate() {
1059            assert_eq!(value.unwrap(), expected[i]);
1060        }
1061    }
1062
1063    #[mz_ore::test]
1064    fn test_reader_invalid_header() {
1065        let schema: Schema = SCHEMA.parse().unwrap();
1066        let invalid = ENCODED.iter().skip(1).copied().collect::<Vec<u8>>();
1067        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
1068    }
1069
1070    #[mz_ore::test]
1071    #[cfg_attr(miri, ignore)] // unsupported operation: inline assembly is not supported
1072    fn test_reader_invalid_block() {
1073        let schema: Schema = SCHEMA.parse().unwrap();
1074        let invalid = ENCODED
1075            .iter()
1076            .rev()
1077            .skip(19)
1078            .copied()
1079            .collect::<Vec<u8>>()
1080            .into_iter()
1081            .rev()
1082            .collect::<Vec<u8>>();
1083        let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
1084        for value in reader {
1085            assert_err!(value);
1086        }
1087    }
1088
1089    #[mz_ore::test]
1090    fn test_reader_empty_buffer() {
1091        let empty = Cursor::new(Vec::new());
1092        assert!(Reader::new(empty).is_err());
1093    }
1094
1095    #[mz_ore::test]
1096    fn test_reader_only_header() {
1097        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
1098        let reader = Reader::new(&invalid[..]).unwrap();
1099        for value in reader {
1100            assert_err!(value);
1101        }
1102    }
1103
1104    #[mz_ore::test]
1105    fn test_resolution_nested_types_error() {
1106        let r = r#"
1107{
1108    "type": "record",
1109    "name": "com.materialize.foo",
1110    "fields": [
1111        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "int"}]}}
1112    ]
1113}
1114"#;
1115        let w = r#"
1116{
1117    "type": "record",
1118    "name": "com.materialize.foo",
1119    "fields": [
1120        {"name": "f1", "type": {"type": "record", "name": "com.materialize.bar", "fields": [{"name": "f1_1", "type": "double"}]}}
1121    ]
1122}
1123"#;
1124        let r: Schema = r.parse().unwrap();
1125        let w: Schema = w.parse().unwrap();
1126        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1127            resolve_schemas(&w, &r)
1128        {
1129            s
1130        } else {
1131            panic!("Expected schema resolution failure");
1132        };
1133        // The field name here must NOT contain `com.materialize.foo`,
1134        // because explicitly named types are all relative to a global
1135        // namespace (i.e., they don't nest).
1136        assert_eq!(
1137            &err_str,
1138            "Writer schema has type `Double`, but reader schema has type `Int` for field `com.materialize.bar.f1_1`"
1139        );
1140    }
1141
1142    #[mz_ore::test]
1143    fn test_extra_fields_without_default_error() {
1144        let r = r#"
1145{
1146    "type": "record",
1147    "name": "com.materialize.foo",
1148    "fields": [
1149        {"name": "f1", "type": "int"},
1150        {"name": "f2", "type": "int"}
1151    ]
1152}
1153"#;
1154        let w = r#"
1155{
1156    "type": "record",
1157    "name": "com.materialize.foo",
1158    "fields": [
1159        {"name": "f1", "type": "int"}
1160    ]
1161}
1162"#;
1163        let r: Schema = r.parse().unwrap();
1164        let w: Schema = w.parse().unwrap();
1165        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1166            resolve_schemas(&w, &r)
1167        {
1168            s
1169        } else {
1170            panic!("Expected schema resolution failure");
1171        };
1172        assert_eq!(
1173            &err_str,
1174            "Reader field `com.materialize.foo.f2` not found in writer, and has no default"
1175        );
1176    }
1177
1178    #[mz_ore::test]
1179    fn test_duplicate_field_error() {
1180        let r = r#"
1181{
1182    "type": "record",
1183    "name": "com.materialize.bar",
1184    "fields": [
1185        {"name": "f1", "type": "int"},
1186        {"name": "f1", "type": "int"}
1187    ]
1188}
1189"#;
1190        let w = r#"
1191{
1192    "type": "record",
1193    "name": "com.materialize.bar",
1194    "fields": [
1195        {"name": "f1", "type": "int"}
1196    ]
1197}
1198"#;
1199        let r: Schema = r.parse().unwrap();
1200        let w: Schema = w.parse().unwrap();
1201        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1202            resolve_schemas(&w, &r)
1203        {
1204            s
1205        } else {
1206            panic!("Expected schema resolution failure");
1207        };
1208        assert_eq!(
1209            &err_str,
1210            "Duplicate field `com.materialize.bar.f1` in schema"
1211        );
1212    }
1213
1214    #[mz_ore::test]
1215    fn test_decimal_field_mismatch_error() {
1216        let r = r#"
1217{
1218    "type": "record",
1219    "name": "com.materialize.foo",
1220    "fields": [
1221        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}
1222    ]
1223}
1224"#;
1225        let w = r#"
1226{
1227    "type": "record",
1228    "name": "com.materialize.foo",
1229    "fields": [
1230        {"name": "f1", "type": {"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 1}}
1231    ]
1232}
1233"#;
1234        let r: Schema = r.parse().unwrap();
1235        let w: Schema = w.parse().unwrap();
1236        let err_str = if let Result::Err(AvroError::ResolveSchema(SchemaResolutionError(s))) =
1237            resolve_schemas(&w, &r)
1238        {
1239            s
1240        } else {
1241            panic!("Expected schema resolution failure");
1242        };
1243        assert_eq!(
1244            &err_str,
1245            "Decimal types must match in precision, scale, and fixed size. Got (5, 1, None); (4, 2. None) for field `com.materialize.foo.f1`"
1246        );
1247    }
1248}