mz_avro/
util.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24use std::i64;
25use std::io::Read;
26
27use serde_json::{Map, Value};
28
29use crate::error::{DecodeError, Error as AvroError};
30
31/// Maximum number of bytes that can be allocated when decoding
32/// Avro-encoded values. This is a protection against ill-formed
33/// data, whose length field might be interpreted as enormous.
34pub const MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
35
36#[derive(Debug, Clone, Copy, Eq, PartialEq)]
37pub enum TsUnit {
38    Millis,
39    Micros,
40}
41
42pub trait MapHelper {
43    fn string(&self, key: &str) -> Option<String>;
44
45    fn name(&self) -> Option<String> {
46        self.string("name")
47    }
48
49    fn doc(&self) -> Option<String> {
50        self.string("doc")
51    }
52}
53
54impl MapHelper for Map<String, Value> {
55    fn string(&self, key: &str) -> Option<String> {
56        self.get(key)
57            .and_then(|v| v.as_str())
58            .map(|v| v.to_string())
59    }
60}
61
62pub fn read_long<R: Read>(reader: &mut R) -> Result<i64, AvroError> {
63    zag_i64(reader)
64}
65
66pub fn zig_i32(n: i32, buffer: &mut Vec<u8>) {
67    zig_i64(n as i64, buffer)
68}
69
70pub fn zig_i64(n: i64, buffer: &mut Vec<u8>) {
71    encode_variable(((n << 1) ^ (n >> 63)) as u64, buffer)
72}
73
74pub fn zag_i32<R: Read>(reader: &mut R) -> Result<i32, AvroError> {
75    let i = zag_i64(reader)?;
76    if i < i64::from(i32::min_value()) || i > i64::from(i32::max_value()) {
77        Err(AvroError::Decode(DecodeError::I32OutOfRange(i)))
78    } else {
79        Ok(i as i32)
80    }
81}
82
83pub fn zag_i64<R: Read>(reader: &mut R) -> Result<i64, AvroError> {
84    let z = decode_variable(reader)?;
85    Ok(if z & 0x1 == 0 {
86        (z >> 1) as i64
87    } else {
88        !(z >> 1) as i64
89    })
90}
91
92fn encode_variable(mut z: u64, buffer: &mut Vec<u8>) {
93    loop {
94        if z <= 0x7F {
95            buffer.push((z & 0x7F) as u8);
96            break;
97        } else {
98            buffer.push((0x80 | (z & 0x7F)) as u8);
99            z >>= 7;
100        }
101    }
102}
103
104fn decode_variable<R: Read>(reader: &mut R) -> Result<u64, AvroError> {
105    let mut i = 0u64;
106    let mut buf = [0u8; 1];
107
108    let mut j = 0;
109    loop {
110        if j > 9 {
111            // if j * 7 > 64
112            return Err(AvroError::Decode(DecodeError::IntDecodeOverflow));
113        }
114        reader.read_exact(&mut buf[..])?;
115        i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
116        if (buf[0] >> 7) == 0 {
117            break;
118        } else {
119            j += 1;
120        }
121    }
122
123    Ok(i)
124}
125
126pub fn safe_len(len: usize) -> Result<usize, AvroError> {
127    if len <= MAX_ALLOCATION_BYTES {
128        Ok(len)
129    } else {
130        Err(AvroError::Allocation {
131            attempted: len,
132            allowed: MAX_ALLOCATION_BYTES,
133        })
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use mz_ore::assert_err;
141
142    #[mz_ore::test]
143    fn test_zigzag() {
144        let mut a = Vec::new();
145        let mut b = Vec::new();
146        zig_i32(42i32, &mut a);
147        zig_i64(42i64, &mut b);
148        assert_eq!(a, b);
149    }
150
151    #[mz_ore::test]
152    fn test_zig_i64() {
153        let mut s = Vec::new();
154        zig_i64(2_147_483_647_i64, &mut s);
155        assert_eq!(s, [254, 255, 255, 255, 15]);
156
157        s.clear();
158        zig_i64(2_147_483_648_i64, &mut s);
159        assert_eq!(s, [128, 128, 128, 128, 16]);
160
161        s.clear();
162        zig_i64(-2_147_483_648_i64, &mut s);
163        assert_eq!(s, [255, 255, 255, 255, 15]);
164
165        s.clear();
166        zig_i64(-2_147_483_649_i64, &mut s);
167        assert_eq!(s, [129, 128, 128, 128, 16]);
168
169        s.clear();
170        zig_i64(i64::MAX, &mut s);
171        assert_eq!(s, [254, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
172
173        s.clear();
174        zig_i64(i64::MIN, &mut s);
175        assert_eq!(s, [255, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
176    }
177
178    #[mz_ore::test]
179    fn test_zig_i32() {
180        let mut s = Vec::new();
181        zig_i32(1_073_741_823_i32, &mut s);
182        assert_eq!(s, [254, 255, 255, 255, 7]);
183
184        s.clear();
185        zig_i32(-1_073_741_824_i32, &mut s);
186        assert_eq!(s, [255, 255, 255, 255, 7]);
187
188        s.clear();
189        zig_i32(1_073_741_824_i32, &mut s);
190        assert_eq!(s, [128, 128, 128, 128, 8]);
191
192        s.clear();
193        zig_i32(-1_073_741_825_i32, &mut s);
194        assert_eq!(s, [129, 128, 128, 128, 8]);
195
196        s.clear();
197        zig_i32(2_147_483_647_i32, &mut s);
198        assert_eq!(s, [254, 255, 255, 255, 15]);
199
200        s.clear();
201        zig_i32(-2_147_483_648_i32, &mut s);
202        assert_eq!(s, [255, 255, 255, 255, 15]);
203    }
204
205    #[mz_ore::test]
206    fn test_overflow() {
207        let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
208        assert_err!(decode_variable(&mut &causes_left_shift_overflow[..]));
209    }
210
211    #[mz_ore::test]
212    fn test_safe_len() {
213        assert_eq!(42usize, safe_len(42usize).unwrap());
214        assert_err!(safe_len(1024 * 1024 * 1024));
215    }
216}