mz_avro/
error.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use std::fmt;
25
26use chrono::NaiveDateTime;
27use fmt::{Debug, Display};
28
29use crate::types::ScalarKind;
30use crate::util::TsUnit;
31use crate::{ParseSchemaError, SchemaResolutionError};
32
33#[derive(Clone, Debug, Eq, PartialEq)]
34pub enum DecodeError {
35    CodecUtf8Error,
36    MapKeyUtf8Error,
37    StringUtf8Error,
38    UuidUtf8Error,
39    UnrecognizedCodec(String),
40    BadSnappyChecksum {
41        expected: u32,
42        actual: u32,
43    },
44    ExpectedNonnegInteger(i64),
45    BadTimestamp {
46        unit: TsUnit,
47        value: i64,
48    },
49    BadBoolean(u8),
50    BadDate(i32),
51    // The distinction between "bad" and "missing",
52    // for both unions and enums,
53    // is that a "bad" index was not found in the writer schema,
54    // meaning either the input is corrupt or there is a bug in this crate,
55    // whereas a "missing" index means the value was validly written,
56    // but can't be interpreted by the _reader_ schema
57    BadUnionIndex {
58        index: usize,
59        len: usize,
60    },
61    MissingUnionIndex(usize),
62    BadEnumIndex {
63        index: usize,
64        len: usize,
65    },
66    MissingEnumIndex {
67        index: usize,
68        symbol: String,
69    },
70    WrongUnionIndex {
71        expected: usize,
72        actual: usize,
73    },
74    UnexpectedRecord,
75    UnexpectedUnion,
76    UnexpectedArray,
77    UnexpectedMap,
78    UnexpectedEnum,
79    UnexpectedScalar,
80    UnexpectedDecimal,
81    UnexpectedBytes,
82    UnexpectedString,
83    UnexpectedJson,
84    UnexpectedUuid,
85    UnexpectedFixed,
86    UnexpectedScalarKind(ScalarKind),
87    WrongHeaderMagic([u8; 4]),
88    MissingAvroDotSchema,
89    I32OutOfRange(i64),
90    IntConversionError,
91    IntDecodeOverflow,
92    BadJson {
93        category: serde_json::error::Category,
94        /// A string representation of what we attempted to decode.
95        /// Ideally the original bytes,
96        /// but might be a re-serialization of a deserialized value,
97        /// if we no longer have access to the original.
98        bytes: Vec<u8>,
99    },
100    BadUuid(uuid::Error),
101    MismatchedBlockHeader {
102        expected: [u8; 16],
103        actual: [u8; 16],
104    },
105    DateOutOfRange(i32),
106    TimestampOutOfRange(NaiveDateTime),
107    Custom(String),
108}
109
110impl DecodeError {
111    fn fmt_inner(&self, f: &mut fmt::Formatter) -> fmt::Result {
112        match self {
113            DecodeError::UnrecognizedCodec(codec) => write!(f, "Unrecognized codec: {}", codec),
114            DecodeError::BadSnappyChecksum { expected, actual } => write!(
115                f,
116                "Bad Snappy CRC32; expected {:x} but got {:x}",
117                expected, actual
118            ),
119            DecodeError::ExpectedNonnegInteger(i) => {
120                write!(f, "Expected non-negative integer, got {}", i)
121            }
122            DecodeError::BadTimestamp { unit, value } => {
123                write!(f, "Invalid timestamp {value} {unit}")
124            }
125            DecodeError::BadBoolean(byte) => write!(f, "Invalid boolean: {:x}", byte),
126            DecodeError::BadDate(since_epoch) => {
127                write!(f, "Invalid num days since epoch: {}", since_epoch)
128            }
129            DecodeError::BadUnionIndex { index, len } => {
130                write!(f, "Union index out of bounds: {} (len: {})", index, len)
131            }
132            DecodeError::MissingUnionIndex(index) => {
133                write!(f, "Union variant not found in reader schema: {}", index)
134            }
135            DecodeError::BadEnumIndex { index, len } => write!(
136                f,
137                "Enum symbol index out of bounds: {} (len: {})",
138                index, len
139            ),
140            DecodeError::MissingEnumIndex { index, symbol } => write!(
141                f,
142                "Enum symbol {} at index {} in writer schema not found in reader",
143                symbol, index
144            ),
145            DecodeError::UnexpectedRecord => write!(f, "Unexpected record"),
146            DecodeError::UnexpectedUnion => write!(f, "Unexpected union"),
147            DecodeError::UnexpectedArray => write!(f, "Unexpected array"),
148            DecodeError::UnexpectedMap => write!(f, "Unexpected map"),
149            DecodeError::UnexpectedEnum => write!(f, "Unexpected enum"),
150            DecodeError::UnexpectedScalar => write!(f, "Unexpected scalar"),
151            DecodeError::UnexpectedDecimal => write!(f, "Unexpected decimal"),
152            DecodeError::UnexpectedBytes => write!(f, "Unexpected bytes"),
153            DecodeError::UnexpectedString => write!(f, "Unexpected string"),
154            DecodeError::UnexpectedJson => write!(f, "Unexpected json"),
155            DecodeError::UnexpectedUuid => write!(f, "Unexpected UUID"),
156            DecodeError::UnexpectedFixed => write!(f, "Unexpected fixed"),
157            DecodeError::UnexpectedScalarKind(kind) => {
158                write!(f, "Scalar of unexpected kind: {:?}", kind)
159            }
160            DecodeError::WrongHeaderMagic(magic) => write!(f, "Wrong header magic: {:x?}", magic),
161            DecodeError::MissingAvroDotSchema => write!(
162                f,
163                "Symbol's value as variable is void: avro.schema missing from header"
164            ),
165            DecodeError::I32OutOfRange(i) => write!(f, "Expected i32, got: {}", i),
166            DecodeError::IntDecodeOverflow => write!(f, "Overflow when decoding integer value"),
167            DecodeError::WrongUnionIndex { expected, actual } => write!(
168                f,
169                "Reader expected variant at index {}, got {}",
170                expected, actual
171            ),
172            DecodeError::Custom(inner) => write!(f, "Error in decode client: {}", inner),
173            DecodeError::CodecUtf8Error => write!(f, "Codec was not valid UTF-8"),
174            DecodeError::MapKeyUtf8Error => write!(f, "Map key was not valid UTF-8"),
175            DecodeError::StringUtf8Error => write!(f, "String was not valid UTF-8"),
176            DecodeError::UuidUtf8Error => write!(f, "UUID was not valid UTF-8"),
177            DecodeError::IntConversionError => write!(f, "Integer conversion failed"),
178            DecodeError::BadJson { category, bytes } => {
179                write!(f, "Json decoding failed: {:?}", category)?;
180                write!(f, " (got {})", String::from_utf8_lossy(bytes))
181            }
182            DecodeError::BadUuid(inner) => write!(f, "UUID decoding failed: {}", inner),
183            DecodeError::MismatchedBlockHeader { expected, actual } => write!(
184                f,
185                "Block marker ({:x?}) does not match header marker ({:x?})",
186                actual, expected
187            ),
188            DecodeError::DateOutOfRange(inner) => write!(f, "Date out of range: {}", inner),
189            DecodeError::TimestampOutOfRange(inner) => {
190                write!(f, "Timestamp out of range: {}", inner)
191            }
192        }
193    }
194}
195
196impl Display for DecodeError {
197    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
198        write!(f, "Decoding error: ")?;
199        self.fmt_inner(f)
200    }
201}
202
203#[derive(Clone, Debug, Eq, PartialEq)]
204// TODO (btv) - some context (where in the record the error occurred) would be nice.
205// We can probably get this from the schema; we would just need to pipe it through a ton of places.
206pub enum Error {
207    Decode(DecodeError),
208    ParseSchema(ParseSchemaError), // TODO (btv) - make this a typed enum, like we did for DecodeError.
209    ResolveSchema(SchemaResolutionError), // TODO (btv) - idem.
210    IO(std::io::ErrorKind),        // Keeping the full error would be nicer, but that isn't `Clone`.
211    Allocation { attempted: usize, allowed: usize },
212}
213
214impl From<std::io::Error> for Error {
215    fn from(e: std::io::Error) -> Self {
216        Self::IO(e.kind())
217    }
218}
219
220impl From<std::convert::Infallible> for Error {
221    fn from(_: std::convert::Infallible) -> Self {
222        unreachable!()
223    }
224}
225
226impl From<std::num::TryFromIntError> for Error {
227    fn from(_: std::num::TryFromIntError) -> Self {
228        Self::Decode(DecodeError::IntConversionError)
229    }
230}
231
232impl From<DecodeError> for Error {
233    fn from(inner: DecodeError) -> Self {
234        Self::Decode(inner)
235    }
236}
237
238impl From<ParseSchemaError> for Error {
239    fn from(inner: ParseSchemaError) -> Self {
240        Self::ParseSchema(inner)
241    }
242}
243
244impl From<SchemaResolutionError> for Error {
245    fn from(inner: SchemaResolutionError) -> Self {
246        Self::ResolveSchema(inner)
247    }
248}
249
250impl Display for Error {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Error::Decode(inner) => write!(f, "Decode error: {}", inner),
254            Error::ParseSchema(inner) => write!(f, "Schema parse error: {}", inner),
255            Error::IO(inner_kind) => write!(f, "IO error: {:?}", inner_kind),
256            Error::Allocation { attempted, allowed } => write!(
257                f,
258                "Allocation error: attempt to allocate {} bytes (maximum allowed: {})",
259                attempted, allowed
260            ),
261            Error::ResolveSchema(inner) => write!(f, "Schema resolution error: {}", inner),
262        }
263    }
264}
265
266impl std::error::Error for Error {}