snap/
read.rs

1/*!
2This module provides two `std::io::Read` implementations:
3
4* [`read::FrameDecoder`](struct.FrameDecoder.html)
5  wraps another `std::io::Read` implemenation, and decompresses data encoded
6  using the Snappy frame format. Use this if you have a compressed data source
7  and wish to read it as uncompressed data.
8* [`read::FrameEncoder`](struct.FrameEncoder.html)
9  wraps another `std::io::Read` implemenation, and compresses data encoded
10  using the Snappy frame format. Use this if you have uncompressed data source
11  and wish to read it as compressed data.
12
13Typically, `read::FrameDecoder` is the version that you'll want.
14*/
15
16use std::cmp;
17use std::fmt;
18use std::io;
19
20use crate::bytes;
21use crate::compress::Encoder;
22use crate::crc32::CheckSummer;
23use crate::decompress::{decompress_len, Decoder};
24use crate::error::Error;
25use crate::frame::{
26    compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE,
27    MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER,
28};
29use crate::MAX_BLOCK_SIZE;
30
31/// The maximum size of a compressed block, including the header and stream
32/// identifier, that can be emitted by FrameEncoder.
33const MAX_READ_FRAME_ENCODER_BLOCK_SIZE: usize = STREAM_IDENTIFIER.len()
34    + CHUNK_HEADER_AND_CRC_SIZE
35    + MAX_COMPRESS_BLOCK_SIZE;
36
37/// A reader for decompressing a Snappy stream.
38///
39/// This `FrameDecoder` wraps any other reader that implements `std::io::Read`.
40/// Bytes read from this reader are decompressed using the
41/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
42/// (file extension `sz`, MIME type `application/x-snappy-framed`).
43///
44/// This reader can potentially make many small reads from the underlying
45/// stream depending on its format, therefore, passing in a buffered reader
46/// may be beneficial.
47pub struct FrameDecoder<R: io::Read> {
48    /// The underlying reader.
49    r: R,
50    /// A Snappy decoder that we reuse that does the actual block based
51    /// decompression.
52    dec: Decoder,
53    /// A CRC32 checksummer that is configured to either use the portable
54    /// fallback version or the SSE4.2 accelerated version when the right CPU
55    /// features are available.
56    checksummer: CheckSummer,
57    /// The compressed bytes buffer, taken from the underlying reader.
58    src: Vec<u8>,
59    /// The decompressed bytes buffer. Bytes are decompressed from src to dst
60    /// before being passed back to the caller.
61    dst: Vec<u8>,
62    /// Index into dst: starting point of bytes not yet given back to caller.
63    dsts: usize,
64    /// Index into dst: ending point of bytes not yet given back to caller.
65    dste: usize,
66    /// Whether we've read the special stream header or not.
67    read_stream_ident: bool,
68}
69
70impl<R: io::Read> FrameDecoder<R> {
71    /// Create a new reader for streaming Snappy decompression.
72    pub fn new(rdr: R) -> FrameDecoder<R> {
73        FrameDecoder {
74            r: rdr,
75            dec: Decoder::new(),
76            checksummer: CheckSummer::new(),
77            src: vec![0; MAX_COMPRESS_BLOCK_SIZE],
78            dst: vec![0; MAX_BLOCK_SIZE],
79            dsts: 0,
80            dste: 0,
81            read_stream_ident: false,
82        }
83    }
84
85    /// Gets a reference to the underlying reader in this decoder.
86    pub fn get_ref(&self) -> &R {
87        &self.r
88    }
89
90    /// Gets a mutable reference to the underlying reader in this decoder.
91    ///
92    /// Note that mutation of the stream may result in surprising results if
93    /// this decoder is continued to be used.
94    pub fn get_mut(&mut self) -> &mut R {
95        &mut self.r
96    }
97
98    /// Gets the underlying reader of this decoder.
99    pub fn into_inner(self) -> R {
100        self.r
101    }
102}
103
104impl<R: io::Read> io::Read for FrameDecoder<R> {
105    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
106        macro_rules! fail {
107            ($err:expr) => {
108                return Err(io::Error::from($err))
109            };
110        }
111        loop {
112            if self.dsts < self.dste {
113                let len = cmp::min(self.dste - self.dsts, buf.len());
114                let dste = self.dsts.checked_add(len).unwrap();
115                buf[0..len].copy_from_slice(&self.dst[self.dsts..dste]);
116                self.dsts = dste;
117                return Ok(len);
118            }
119            if !read_exact_eof(&mut self.r, &mut self.src[0..4])? {
120                return Ok(0);
121            }
122            let ty = ChunkType::from_u8(self.src[0]);
123            if !self.read_stream_ident {
124                if ty != Ok(ChunkType::Stream) {
125                    fail!(Error::StreamHeader { byte: self.src[0] });
126                }
127                self.read_stream_ident = true;
128            }
129            let len64 = bytes::read_u24_le(&self.src[1..]) as u64;
130            if len64 > self.src.len() as u64 {
131                fail!(Error::UnsupportedChunkLength {
132                    len: len64,
133                    header: false,
134                });
135            }
136            let len = len64 as usize;
137            match ty {
138                Err(b) if 0x02 <= b && b <= 0x7F => {
139                    // Spec says that chunk types 0x02-0x7F are reserved and
140                    // conformant decoders must return an error.
141                    fail!(Error::UnsupportedChunkType { byte: b });
142                }
143                Err(b) if 0x80 <= b && b <= 0xFD => {
144                    // Spec says that chunk types 0x80-0xFD are reserved but
145                    // skippable.
146                    self.r.read_exact(&mut self.src[0..len])?;
147                }
148                Err(b) => {
149                    // Can never happen. 0x02-0x7F and 0x80-0xFD are handled
150                    // above in the error case. That leaves 0x00, 0x01, 0xFE
151                    // and 0xFF, each of which correspond to one of the four
152                    // defined chunk types.
153                    unreachable!("BUG: unhandled chunk type: {}", b);
154                }
155                Ok(ChunkType::Padding) => {
156                    // Just read and move on.
157                    self.r.read_exact(&mut self.src[0..len])?;
158                }
159                Ok(ChunkType::Stream) => {
160                    if len != STREAM_BODY.len() {
161                        fail!(Error::UnsupportedChunkLength {
162                            len: len64,
163                            header: true,
164                        })
165                    }
166                    self.r.read_exact(&mut self.src[0..len])?;
167                    if &self.src[0..len] != STREAM_BODY {
168                        fail!(Error::StreamHeaderMismatch {
169                            bytes: self.src[0..len].to_vec(),
170                        });
171                    }
172                }
173                Ok(ChunkType::Uncompressed) => {
174                    if len < 4 {
175                        fail!(Error::UnsupportedChunkLength {
176                            len: len as u64,
177                            header: false,
178                        });
179                    }
180                    let expected_sum = bytes::io_read_u32_le(&mut self.r)?;
181                    let n = len - 4;
182                    if n > self.dst.len() {
183                        fail!(Error::UnsupportedChunkLength {
184                            len: n as u64,
185                            header: false,
186                        });
187                    }
188                    self.r.read_exact(&mut self.dst[0..n])?;
189                    let got_sum =
190                        self.checksummer.crc32c_masked(&self.dst[0..n]);
191                    if expected_sum != got_sum {
192                        fail!(Error::Checksum {
193                            expected: expected_sum,
194                            got: got_sum,
195                        });
196                    }
197                    self.dsts = 0;
198                    self.dste = n;
199                }
200                Ok(ChunkType::Compressed) => {
201                    if len < 4 {
202                        fail!(Error::UnsupportedChunkLength {
203                            len: len as u64,
204                            header: false,
205                        });
206                    }
207                    let expected_sum = bytes::io_read_u32_le(&mut self.r)?;
208                    let sn = len - 4;
209                    if sn > self.src.len() {
210                        fail!(Error::UnsupportedChunkLength {
211                            len: len64,
212                            header: false,
213                        });
214                    }
215                    self.r.read_exact(&mut self.src[0..sn])?;
216                    let dn = decompress_len(&self.src)?;
217                    if dn > self.dst.len() {
218                        fail!(Error::UnsupportedChunkLength {
219                            len: dn as u64,
220                            header: false,
221                        });
222                    }
223                    self.dec
224                        .decompress(&self.src[0..sn], &mut self.dst[0..dn])?;
225                    let got_sum =
226                        self.checksummer.crc32c_masked(&self.dst[0..dn]);
227                    if expected_sum != got_sum {
228                        fail!(Error::Checksum {
229                            expected: expected_sum,
230                            got: got_sum,
231                        });
232                    }
233                    self.dsts = 0;
234                    self.dste = dn;
235                }
236            }
237        }
238    }
239}
240
241impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> {
242    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243        f.debug_struct("FrameDecoder")
244            .field("r", &self.r)
245            .field("dec", &self.dec)
246            .field("checksummer", &self.checksummer)
247            .field("src", &"[...]")
248            .field("dst", &"[...]")
249            .field("dsts", &self.dsts)
250            .field("dste", &self.dste)
251            .field("read_stream_ident", &self.read_stream_ident)
252            .finish()
253    }
254}
255
256/// A reader for compressing data using snappy as it is read.
257///
258/// This `FrameEncoder` wraps any other reader that implements `std::io::Read`.
259/// Bytes read from this reader are compressed using the
260/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt)
261/// (file extension `sz`, MIME type `application/x-snappy-framed`).
262///
263/// Usually you'll want
264/// [`read::FrameDecoder`](struct.FrameDecoder.html)
265/// (for decompressing while reading) or
266/// [`write::FrameEncoder`](../write/struct.FrameEncoder.html)
267/// (for compressing while writing) instead.
268///
269/// Unlike `FrameDecoder`, this will attempt to make large reads roughly
270/// equivalent to the size of a single Snappy block. Therefore, callers may not
271/// benefit from using a buffered reader.
272pub struct FrameEncoder<R: io::Read> {
273    /// Internally, we split `FrameEncoder` in two to keep the borrow checker
274    /// happy. The `inner` member contains everything that `read_frame` needs
275    /// to fetch a frame's worth of data and compress it.
276    inner: Inner<R>,
277    /// Data that we've encoded and are ready to return to our caller.
278    dst: Vec<u8>,
279    /// Starting point of bytes in `dst` not yet given back to the caller.
280    dsts: usize,
281    /// Ending point of bytes in `dst` that we want to give to our caller.
282    dste: usize,
283}
284
285struct Inner<R: io::Read> {
286    /// The underlying data source.
287    r: R,
288    /// An encoder that we reuse that does the actual block based compression.
289    enc: Encoder,
290    /// A CRC32 checksummer that is configured to either use the portable
291    /// fallback version or the SSE4.2 accelerated version when the right CPU
292    /// features are available.
293    checksummer: CheckSummer,
294    /// Data taken from the underlying `r`, and not yet compressed.
295    src: Vec<u8>,
296    /// Have we written the standard snappy header to `dst` yet?
297    wrote_stream_ident: bool,
298}
299
300impl<R: io::Read> FrameEncoder<R> {
301    /// Create a new reader for streaming Snappy compression.
302    pub fn new(rdr: R) -> FrameEncoder<R> {
303        FrameEncoder {
304            inner: Inner {
305                r: rdr,
306                enc: Encoder::new(),
307                checksummer: CheckSummer::new(),
308                src: vec![0; MAX_BLOCK_SIZE],
309                wrote_stream_ident: false,
310            },
311            dst: vec![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE],
312            dsts: 0,
313            dste: 0,
314        }
315    }
316
317    /// Gets a reference to the underlying reader in this decoder.
318    pub fn get_ref(&self) -> &R {
319        &self.inner.r
320    }
321
322    /// Gets a mutable reference to the underlying reader in this decoder.
323    ///
324    /// Note that mutation of the stream may result in surprising results if
325    /// this encoder is continued to be used.
326    pub fn get_mut(&mut self) -> &mut R {
327        &mut self.inner.r
328    }
329
330    /// Read previously compressed data from `self.dst`, returning the number of
331    /// bytes read. If `self.dst` is empty, returns 0.
332    fn read_from_dst(&mut self, buf: &mut [u8]) -> usize {
333        let available_bytes = self.dste - self.dsts;
334        let count = cmp::min(available_bytes, buf.len());
335        buf[..count].copy_from_slice(&self.dst[self.dsts..self.dsts + count]);
336        self.dsts += count;
337        count
338    }
339}
340
341impl<R: io::Read> io::Read for FrameEncoder<R> {
342    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
343        // Try reading previously compressed bytes from our `dst` buffer, if
344        // any.
345        let count = self.read_from_dst(buf);
346
347        if count > 0 {
348            // We had some bytes in our `dst` buffer that we used.
349            Ok(count)
350        } else if buf.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE {
351            // Our output `buf` is big enough that we can directly write into
352            // it, so bypass `dst` entirely.
353            self.inner.read_frame(buf)
354        } else {
355            // We need to refill `self.dst`, and then return some bytes from
356            // that.
357            let count = self.inner.read_frame(&mut self.dst)?;
358            self.dsts = 0;
359            self.dste = count;
360            Ok(self.read_from_dst(buf))
361        }
362    }
363}
364
365impl<R: io::Read> Inner<R> {
366    /// Read from `self.r`, and create a new frame, writing it to `dst`, which
367    /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size.
368    fn read_frame(&mut self, dst: &mut [u8]) -> io::Result<usize> {
369        debug_assert!(dst.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE);
370
371        // We make one read to the underlying reader. If the underlying reader
372        // doesn't fill the buffer but there are still bytes to be read, then
373        // compression won't be optimal. The alternative would be to block
374        // until our buffer is maximally full (or we see EOF), but this seems
375        // more surprising. In general, io::Read implementations should try to
376        // fill the caller's buffer as much as they can, so this seems like the
377        // better choice.
378        let nread = self.r.read(&mut self.src)?;
379        if nread == 0 {
380            return Ok(0);
381        }
382
383        // If we haven't yet written the stream header to `dst`, write it.
384        let mut dst_write_start = 0;
385        if !self.wrote_stream_ident {
386            dst[0..STREAM_IDENTIFIER.len()].copy_from_slice(STREAM_IDENTIFIER);
387            dst_write_start += STREAM_IDENTIFIER.len();
388            self.wrote_stream_ident = true;
389        }
390
391        // Reserve space for our chunk header. We need to use `split_at_mut` so
392        // that we can get two mutable slices pointing at non-overlapping parts
393        // of `dst`.
394        let (chunk_header, remaining_dst) =
395            dst[dst_write_start..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE);
396        dst_write_start += CHUNK_HEADER_AND_CRC_SIZE;
397
398        // Compress our frame if possible, telling `compress_frame` to always
399        // put the output in `dst`.
400        let frame_data = compress_frame(
401            &mut self.enc,
402            self.checksummer,
403            &self.src[..nread],
404            chunk_header,
405            remaining_dst,
406            true,
407        )?;
408        Ok(dst_write_start + frame_data.len())
409    }
410}
411
412impl<R: fmt::Debug + io::Read> fmt::Debug for FrameEncoder<R> {
413    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
414        f.debug_struct("FrameEncoder")
415            .field("inner", &self.inner)
416            .field("dst", &"[...]")
417            .field("dsts", &self.dsts)
418            .field("dste", &self.dste)
419            .finish()
420    }
421}
422
423impl<R: fmt::Debug + io::Read> fmt::Debug for Inner<R> {
424    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425        f.debug_struct("Inner")
426            .field("r", &self.r)
427            .field("enc", &self.enc)
428            .field("checksummer", &self.checksummer)
429            .field("src", &"[...]")
430            .field("wrote_stream_ident", &self.wrote_stream_ident)
431            .finish()
432    }
433}
434
435// read_exact_eof is like Read::read_exact, except it detects EOF
436// and returns Ok(false) instead of an error.
437//
438// If buf was read successfully, it returns Ok(true).
439fn read_exact_eof<R: io::Read>(
440    rdr: &mut R,
441    buf: &mut [u8],
442) -> io::Result<bool> {
443    match rdr.read(buf) {
444        // EOF
445        Ok(0) => Ok(false),
446        // Read everything w/ the read call
447        Ok(i) if i == buf.len() => Ok(true),
448        // There's some bytes left to fill, which can be deferred to read_exact
449        Ok(i) => {
450            rdr.read_exact(&mut buf[i..])?;
451            Ok(true)
452        }
453        Err(e) => Err(e),
454    }
455}