mz_avro/
codec.rs

1// Copyright 2018 Flavien Raynaud.
2// Copyright Materialize, Inc. and contributors. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License in the LICENSE file at the
7// root of this repository, or online at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16//
17// This file is derived from the avro-rs project, available at
18// https://github.com/flavray/avro-rs. It was incorporated
19// directly into Materialize on March 3, 2020.
20//
21// The original source code is subject to the terms of the MIT license, a copy
22// of which can be found in the LICENSE file at the root of this repository.
23
24//! Logic for all supported compression codecs in Avro.
25use std::io::{Read, Write};
26use std::str::FromStr;
27
28use anyhow::Error;
29use flate2::Compression;
30use flate2::read::DeflateDecoder;
31use flate2::write::DeflateEncoder;
32
33use crate::error::{DecodeError, Error as AvroError};
34use crate::types::{ToAvro, Value};
35
36/// The compression codec used to compress blocks.
37#[derive(Clone, Copy, Debug, PartialEq)]
38pub enum Codec {
39    /// The `Null` codec simply passes through data uncompressed.
40    Null,
41    /// The `Deflate` codec writes the data block using the deflate algorithm
42    /// as specified in RFC 1951, and typically implemented using the zlib library.
43    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
44    Deflate,
45    #[cfg(feature = "snappy")]
46    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
47    /// compression library. Each compressed block is followed by the 4-byte, big-endian
48    /// CRC32 checksum of the uncompressed data in the block.
49    Snappy,
50}
51
52impl ToAvro for Codec {
53    fn avro(self) -> Value {
54        Value::Bytes(
55            match self {
56                Codec::Null => "null",
57                Codec::Deflate => "deflate",
58                #[cfg(feature = "snappy")]
59                Codec::Snappy => "snappy",
60            }
61            .to_owned()
62            .into_bytes(),
63        )
64    }
65}
66
67impl FromStr for Codec {
68    type Err = AvroError;
69
70    fn from_str(s: &str) -> Result<Self, Self::Err> {
71        match s {
72            "null" => Ok(Codec::Null),
73            "deflate" => Ok(Codec::Deflate),
74            #[cfg(feature = "snappy")]
75            "snappy" => Ok(Codec::Snappy),
76            other => Err(DecodeError::UnrecognizedCodec(other.to_string()).into()),
77        }
78    }
79}
80
81impl Codec {
82    /// Compress a stream of bytes in-place.
83    pub fn compress(self, stream: &mut Vec<u8>) -> Result<(), Error> {
84        match self {
85            Codec::Null => (),
86            Codec::Deflate => {
87                let mut encoder = DeflateEncoder::new(Vec::new(), Compression::default());
88                encoder.write_all(stream)?;
89                *stream = encoder.finish()?;
90            }
91            #[cfg(feature = "snappy")]
92            Codec::Snappy => {
93                use byteorder::ByteOrder;
94
95                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
96                let compressed_size =
97                    snap::raw::Encoder::new().compress(&stream[..], &mut encoded[..])?;
98
99                let crc = {
100                    let mut hasher = crc32fast::Hasher::new();
101                    hasher.update(stream);
102                    hasher.finalize()
103                };
104                byteorder::BigEndian::write_u32(&mut encoded[compressed_size..], crc);
105                encoded.truncate(compressed_size + 4);
106
107                *stream = encoded;
108            }
109        };
110
111        Ok(())
112    }
113
114    /// Decompress a stream of bytes in-place.
115    pub fn decompress(self, stream: &mut Vec<u8>) -> Result<(), AvroError> {
116        match self {
117            Codec::Null => (),
118            Codec::Deflate => {
119                let mut decoded = Vec::new();
120                {
121                    let mut decoder = DeflateDecoder::new(&**stream);
122                    decoder.read_to_end(&mut decoded)?;
123                }
124                *stream = decoded;
125            }
126            #[cfg(feature = "snappy")]
127            Codec::Snappy => {
128                use byteorder::ByteOrder;
129
130                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
131                    .map_err(std::io::Error::from)?;
132                let mut decoded = vec![0; decompressed_size];
133                snap::raw::Decoder::new()
134                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
135                    .map_err(std::io::Error::from)?;
136
137                let expected_crc = byteorder::BigEndian::read_u32(&stream[stream.len() - 4..]);
138                let actual_crc = {
139                    let mut hasher = crc32fast::Hasher::new();
140                    hasher.update(&decoded);
141                    hasher.finalize()
142                };
143
144                if expected_crc != actual_crc {
145                    return Err(DecodeError::BadSnappyChecksum {
146                        expected: expected_crc,
147                        actual: actual_crc,
148                    }
149                    .into());
150                }
151                *stream = decoded;
152            }
153        };
154
155        Ok(())
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162
163    static INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
164
165    #[mz_ore::test]
166    fn null_compress_and_decompress() {
167        let codec = Codec::Null;
168        let mut stream = INPUT.to_vec();
169        codec.compress(&mut stream).unwrap();
170        assert_eq!(INPUT, stream.as_slice());
171        codec.decompress(&mut stream).unwrap();
172        assert_eq!(INPUT, stream.as_slice());
173    }
174
175    #[mz_ore::test]
176    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `deflateInit2_` on OS `linux`
177    fn deflate_compress_and_decompress() {
178        let codec = Codec::Deflate;
179        let mut stream = INPUT.to_vec();
180        codec.compress(&mut stream).unwrap();
181        assert_ne!(INPUT, stream.as_slice());
182        assert!(INPUT.len() > stream.len());
183        codec.decompress(&mut stream).unwrap();
184        assert_eq!(INPUT, stream.as_slice());
185    }
186
187    #[cfg(feature = "snappy")]
188    #[mz_ore::test]
189    fn snappy_compress_and_decompress() {
190        let codec = Codec::Snappy;
191        let mut stream = INPUT.to_vec();
192        codec.compress(&mut stream).unwrap();
193        assert_ne!(INPUT, stream.as_slice());
194        assert!(INPUT.len() > stream.len());
195        codec.decompress(&mut stream).unwrap();
196        assert_eq!(INPUT, stream.as_slice());
197    }
198}