bzip2/
read.rs

1//! Reader-based compression/decompression streams
2
3use std::io::prelude::*;
4use std::io::{self, BufReader};
5
6#[cfg(feature = "tokio")]
7use futures::Poll;
8#[cfg(feature = "tokio")]
9use tokio_io::{AsyncRead, AsyncWrite};
10
11use bufread;
12use Compression;
13
14/// A compression stream which wraps an uncompressed stream of data. Compressed
15/// data will be read from the stream.
16pub struct BzEncoder<R> {
17    inner: bufread::BzEncoder<BufReader<R>>,
18}
19
20/// A decompression stream which wraps a compressed stream of data. Decompressed
21/// data will be read from the stream.
22pub struct BzDecoder<R> {
23    inner: bufread::BzDecoder<BufReader<R>>,
24}
25
26impl<R: Read> BzEncoder<R> {
27    /// Create a new compression stream which will compress at the given level
28    /// to read compress output to the give output stream.
29    pub fn new(r: R, level: Compression) -> BzEncoder<R> {
30        BzEncoder {
31            inner: bufread::BzEncoder::new(BufReader::new(r), level),
32        }
33    }
34
35    /// Acquires a reference to the underlying stream
36    pub fn get_ref(&self) -> &R {
37        self.inner.get_ref().get_ref()
38    }
39
40    /// Acquires a mutable reference to the underlying stream
41    ///
42    /// Note that mutation of the stream may result in surprising results if
43    /// this encoder is continued to be used.
44    pub fn get_mut(&mut self) -> &mut R {
45        self.inner.get_mut().get_mut()
46    }
47
48    /// Unwrap the underlying writer, finishing the compression stream.
49    pub fn into_inner(self) -> R {
50        self.inner.into_inner().into_inner()
51    }
52
53    /// Returns the number of bytes produced by the compressor
54    /// (e.g. the number of bytes read from this stream)
55    ///
56    /// Note that, due to buffering, this only bears any relation to
57    /// total_in() when the compressor chooses to flush its data
58    /// (unfortunately, this won't happen in general
59    /// at the end of the stream, because the compressor doesn't know
60    /// if there's more data to come).  At that point,
61    /// `total_out() / total_in()` would be the compression ratio.
62    pub fn total_out(&self) -> u64 {
63        self.inner.total_out()
64    }
65
66    /// Returns the number of bytes consumed by the compressor
67    /// (e.g. the number of bytes read from the underlying stream)
68    pub fn total_in(&self) -> u64 {
69        self.inner.total_in()
70    }
71}
72
73impl<R: Read> Read for BzEncoder<R> {
74    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
75        self.inner.read(buf)
76    }
77}
78
79#[cfg(feature = "tokio")]
80impl<R: AsyncRead> AsyncRead for BzEncoder<R> {}
81
82impl<W: Write + Read> Write for BzEncoder<W> {
83    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
84        self.get_mut().write(buf)
85    }
86
87    fn flush(&mut self) -> io::Result<()> {
88        self.get_mut().flush()
89    }
90}
91
92#[cfg(feature = "tokio")]
93impl<R: AsyncWrite + Read> AsyncWrite for BzEncoder<R> {
94    fn shutdown(&mut self) -> Poll<(), io::Error> {
95        self.get_mut().shutdown()
96    }
97}
98
99impl<R: Read> BzDecoder<R> {
100    /// Create a new decompression stream, which will read compressed
101    /// data from the given input stream and decompress it.
102    pub fn new(r: R) -> BzDecoder<R> {
103        BzDecoder {
104            inner: bufread::BzDecoder::new(BufReader::new(r)),
105        }
106    }
107
108    /// Acquires a reference to the underlying stream
109    pub fn get_ref(&self) -> &R {
110        self.inner.get_ref().get_ref()
111    }
112
113    /// Acquires a mutable reference to the underlying stream
114    ///
115    /// Note that mutation of the stream may result in surprising results if
116    /// this encoder is continued to be used.
117    pub fn get_mut(&mut self) -> &mut R {
118        self.inner.get_mut().get_mut()
119    }
120
121    /// Unwrap the underlying writer, finishing the compression stream.
122    pub fn into_inner(self) -> R {
123        self.inner.into_inner().into_inner()
124    }
125
126    /// Returns the number of bytes produced by the decompressor
127    /// (e.g. the number of bytes read from this stream)
128    ///
129    /// Note that, due to buffering, this only bears any relation to
130    /// total_in() when the decompressor reaches a sync point
131    /// (e.g. where the original compressed stream was flushed).
132    /// At that point, `total_in() / total_out()` is the compression ratio.
133    pub fn total_out(&self) -> u64 {
134        self.inner.total_out()
135    }
136
137    /// Returns the number of bytes consumed by the decompressor
138    /// (e.g. the number of bytes read from the underlying stream)
139    pub fn total_in(&self) -> u64 {
140        self.inner.total_in()
141    }
142}
143
144impl<R: Read> Read for BzDecoder<R> {
145    fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
146        self.inner.read(into)
147    }
148}
149
150#[cfg(feature = "tokio")]
151impl<R: AsyncRead + Read> AsyncRead for BzDecoder<R> {}
152
153impl<W: Write + Read> Write for BzDecoder<W> {
154    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
155        self.get_mut().write(buf)
156    }
157
158    fn flush(&mut self) -> io::Result<()> {
159        self.get_mut().flush()
160    }
161}
162
163#[cfg(feature = "tokio")]
164impl<R: AsyncWrite + Read> AsyncWrite for BzDecoder<R> {
165    fn shutdown(&mut self) -> Poll<(), io::Error> {
166        self.get_mut().shutdown()
167    }
168}
169
170/// A bzip2 streaming decoder that decodes all members of a multistream
171///
172/// Wikipedia, particularly, uses bzip2 multistream for their dumps.
173pub struct MultiBzDecoder<R> {
174    inner: bufread::MultiBzDecoder<BufReader<R>>,
175}
176
177impl<R: Read> MultiBzDecoder<R> {
178    /// Creates a new decoder from the given reader, immediately parsing the
179    /// (first) gzip header. If the gzip stream contains multiple members all will
180    /// be decoded.
181    pub fn new(r: R) -> MultiBzDecoder<R> {
182        MultiBzDecoder {
183            inner: bufread::MultiBzDecoder::new(BufReader::new(r)),
184        }
185    }
186}
187
188impl<R> MultiBzDecoder<R> {
189    /// Acquires a reference to the underlying reader.
190    pub fn get_ref(&self) -> &R {
191        self.inner.get_ref().get_ref()
192    }
193
194    /// Acquires a mutable reference to the underlying stream.
195    ///
196    /// Note that mutation of the stream may result in surprising results if
197    /// this encoder is continued to be used.
198    pub fn get_mut(&mut self) -> &mut R {
199        self.inner.get_mut().get_mut()
200    }
201
202    /// Consumes this decoder, returning the underlying reader.
203    pub fn into_inner(self) -> R {
204        self.inner.into_inner().into_inner()
205    }
206}
207
208impl<R: Read> Read for MultiBzDecoder<R> {
209    fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
210        self.inner.read(into)
211    }
212}
213
214#[cfg(feature = "tokio")]
215impl<R: AsyncRead> AsyncRead for MultiBzDecoder<R> {}
216
217impl<R: Read + Write> Write for MultiBzDecoder<R> {
218    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
219        self.get_mut().write(buf)
220    }
221
222    fn flush(&mut self) -> io::Result<()> {
223        self.get_mut().flush()
224    }
225}
226
227#[cfg(feature = "tokio")]
228impl<R: AsyncWrite + AsyncRead> AsyncWrite for MultiBzDecoder<R> {
229    fn shutdown(&mut self) -> Poll<(), io::Error> {
230        self.get_mut().shutdown()
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use partial_io::{GenInterrupted, PartialRead, PartialWithErrors};
237    use rand::distributions::Standard;
238    use rand::{thread_rng, Rng};
239    use read::{BzDecoder, BzEncoder, MultiBzDecoder};
240    use std::io::prelude::*;
241    use Compression;
242
243    #[test]
244    fn smoke() {
245        let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
246        let mut c = BzEncoder::new(m, Compression::default());
247        let mut data = vec![];
248        c.read_to_end(&mut data).unwrap();
249        let mut d = BzDecoder::new(&data[..]);
250        let mut data2 = Vec::new();
251        d.read_to_end(&mut data2).unwrap();
252        assert_eq!(data2, m);
253    }
254
255    #[test]
256    fn smoke2() {
257        let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
258        let c = BzEncoder::new(m, Compression::default());
259        let mut d = BzDecoder::new(c);
260        let mut data = vec![];
261        d.read_to_end(&mut data).unwrap();
262        assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]);
263    }
264
265    #[test]
266    fn smoke3() {
267        let m = vec![3u8; 128 * 1024 + 1];
268        let c = BzEncoder::new(&m[..], Compression::default());
269        let mut d = BzDecoder::new(c);
270        let mut data = vec![];
271        d.read_to_end(&mut data).unwrap();
272        assert!(data == &m[..]);
273    }
274
275    #[test]
276    fn self_terminating() {
277        let m = vec![3u8; 128 * 1024 + 1];
278        let mut c = BzEncoder::new(&m[..], Compression::default());
279
280        let mut result = Vec::new();
281        c.read_to_end(&mut result).unwrap();
282
283        let v = thread_rng()
284            .sample_iter(&Standard)
285            .take(1024)
286            .collect::<Vec<_>>();
287        for _ in 0..200 {
288            result.extend(v.iter().map(|x: &u8| *x));
289        }
290
291        let mut d = BzDecoder::new(&result[..]);
292        let mut data = Vec::with_capacity(m.len());
293        unsafe {
294            data.set_len(m.len());
295        }
296        assert!(d.read(&mut data).unwrap() == m.len());
297        assert!(data == &m[..]);
298    }
299
300    #[test]
301    fn zero_length_read_at_eof() {
302        let m = Vec::new();
303        let mut c = BzEncoder::new(&m[..], Compression::default());
304
305        let mut result = Vec::new();
306        c.read_to_end(&mut result).unwrap();
307
308        let mut d = BzDecoder::new(&result[..]);
309        let mut data = Vec::new();
310        assert!(d.read(&mut data).unwrap() == 0);
311    }
312
313    #[test]
314    fn zero_length_read_with_data() {
315        let m = vec![3u8; 128 * 1024 + 1];
316        let mut c = BzEncoder::new(&m[..], Compression::default());
317
318        let mut result = Vec::new();
319        c.read_to_end(&mut result).unwrap();
320
321        let mut d = BzDecoder::new(&result[..]);
322        let mut data = Vec::new();
323        assert!(d.read(&mut data).unwrap() == 0);
324    }
325
326    #[test]
327    fn multistream_read_till_eof() {
328        let m = vec![3u8; 128 * 1024 + 1];
329        let repeat = 3;
330        let mut result = Vec::new();
331
332        for _i in 0..repeat {
333            let mut c = BzEncoder::new(&m[..], Compression::default());
334            c.read_to_end(&mut result).unwrap();
335        }
336
337        let mut d = MultiBzDecoder::new(&result[..]);
338        let mut data = Vec::new();
339
340        let a = d.read_to_end(&mut data).unwrap();
341        let b = m.len() * repeat;
342        assert!(a == b, "{} {}", a, b);
343    }
344
345    #[test]
346    fn empty() {
347        let r = BzEncoder::new(&[][..], Compression::default());
348        let mut r = BzDecoder::new(r);
349        let mut v2 = Vec::new();
350        r.read_to_end(&mut v2).unwrap();
351        assert!(v2.len() == 0);
352    }
353
354    #[test]
355    fn qc() {
356        ::quickcheck::quickcheck(test as fn(_) -> _);
357
358        fn test(v: Vec<u8>) -> bool {
359            let r = BzEncoder::new(&v[..], Compression::default());
360            let mut r = BzDecoder::new(r);
361            let mut v2 = Vec::new();
362            r.read_to_end(&mut v2).unwrap();
363            v == v2
364        }
365    }
366
367    #[test]
368    fn qc_partial() {
369        quickcheck6::quickcheck(test as fn(_, _, _) -> _);
370
371        fn test(
372            v: Vec<u8>,
373            encode_ops: PartialWithErrors<GenInterrupted>,
374            decode_ops: PartialWithErrors<GenInterrupted>,
375        ) -> bool {
376            let r = BzEncoder::new(PartialRead::new(&v[..], encode_ops), Compression::default());
377            let mut r = BzDecoder::new(PartialRead::new(r, decode_ops));
378            let mut v2 = Vec::new();
379            r.read_to_end(&mut v2).unwrap();
380            v == v2
381        }
382    }
383}