xz2/
read.rs

1//! Reader-based compression/decompression streams
2
3use std::io::prelude::*;
4use std::io::{self, BufReader};
5
6#[cfg(feature = "tokio")]
7use futures::Poll;
8#[cfg(feature = "tokio")]
9use tokio_io::{AsyncRead, AsyncWrite};
10
11use crate::bufread;
12use crate::stream::Stream;
13
14/// A compression stream which wraps an uncompressed stream of data. Compressed
15/// data will be read from the stream.
16pub struct XzEncoder<R: Read> {
17    inner: bufread::XzEncoder<BufReader<R>>,
18}
19
20/// A decompression stream which wraps a compressed stream of data. Decompressed
21/// data will be read from the stream.
22pub struct XzDecoder<R: Read> {
23    inner: bufread::XzDecoder<BufReader<R>>,
24}
25
26impl<R: Read> XzEncoder<R> {
27    /// Create a new compression stream which will compress at the given level
28    /// to read compress output to the give output stream.
29    ///
30    /// The `level` argument here is typically 0-9 with 6 being a good default.
31    pub fn new(r: R, level: u32) -> XzEncoder<R> {
32        XzEncoder {
33            inner: bufread::XzEncoder::new(BufReader::new(r), level),
34        }
35    }
36
37    /// Creates a new encoder with a custom `Stream`.
38    ///
39    /// The `Stream` can be pre-configured for multithreaded encoding, different
40    /// compression options/tuning, etc.
41    pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
42        XzEncoder {
43            inner: bufread::XzEncoder::new_stream(BufReader::new(r), stream),
44        }
45    }
46
47    /// Acquires a reference to the underlying stream
48    pub fn get_ref(&self) -> &R {
49        self.inner.get_ref().get_ref()
50    }
51
52    /// Acquires a mutable reference to the underlying stream
53    ///
54    /// Note that mutation of the stream may result in surprising results if
55    /// this encoder is continued to be used.
56    pub fn get_mut(&mut self) -> &mut R {
57        self.inner.get_mut().get_mut()
58    }
59
60    /// Unwrap the underlying writer, finishing the compression stream.
61    pub fn into_inner(self) -> R {
62        self.inner.into_inner().into_inner()
63    }
64
65    /// Returns the number of bytes produced by the compressor
66    /// (e.g. the number of bytes read from this stream)
67    ///
68    /// Note that, due to buffering, this only bears any relation to
69    /// total_in() when the compressor chooses to flush its data
70    /// (unfortunately, this won't happen this won't happen in general
71    /// at the end of the stream, because the compressor doesn't know
72    /// if there's more data to come).  At that point,
73    /// `total_out() / total_in()` would be the compression ratio.
74    pub fn total_out(&self) -> u64 {
75        self.inner.total_out()
76    }
77
78    /// Returns the number of bytes consumed by the compressor
79    /// (e.g. the number of bytes read from the underlying stream)
80    pub fn total_in(&self) -> u64 {
81        self.inner.total_in()
82    }
83}
84
85impl<R: Read> Read for XzEncoder<R> {
86    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
87        self.inner.read(buf)
88    }
89}
90
91#[cfg(feature = "tokio")]
92impl<R: AsyncRead> AsyncRead for XzEncoder<R> {}
93
94impl<W: Write + Read> Write for XzEncoder<W> {
95    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
96        self.get_mut().write(buf)
97    }
98
99    fn flush(&mut self) -> io::Result<()> {
100        self.get_mut().flush()
101    }
102}
103
104#[cfg(feature = "tokio")]
105impl<R: AsyncWrite + Read> AsyncWrite for XzEncoder<R> {
106    fn shutdown(&mut self) -> Poll<(), io::Error> {
107        self.get_mut().shutdown()
108    }
109}
110
111impl<R: Read> XzDecoder<R> {
112    /// Create a new decompression stream, which will read compressed
113    /// data from the given input stream, and decompress one xz stream.
114    /// It may also consume input data that follows the xz stream.
115    /// Use [`xz::bufread::XzDecoder`] instead to process a mix of xz and non-xz data.
116    pub fn new(r: R) -> XzDecoder<R> {
117        XzDecoder {
118            inner: bufread::XzDecoder::new(BufReader::new(r)),
119        }
120    }
121
122    /// Create a new decompression stream, which will read compressed
123    /// data from the given input and decompress all the xz stream it contains.
124    pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
125        XzDecoder {
126            inner: bufread::XzDecoder::new_multi_decoder(BufReader::new(r)),
127        }
128    }
129
130    /// Creates a new decoder with a custom `Stream`.
131    ///
132    /// The `Stream` can be pre-configured for various checks, different
133    /// decompression options/tuning, etc.
134    pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
135        XzDecoder {
136            inner: bufread::XzDecoder::new_stream(BufReader::new(r), stream),
137        }
138    }
139
140    /// Acquires a reference to the underlying stream
141    pub fn get_ref(&self) -> &R {
142        self.inner.get_ref().get_ref()
143    }
144
145    /// Acquires a mutable reference to the underlying stream
146    ///
147    /// Note that mutation of the stream may result in surprising results if
148    /// this encoder is continued to be used.
149    pub fn get_mut(&mut self) -> &mut R {
150        self.inner.get_mut().get_mut()
151    }
152
153    /// Unwrap the underlying writer, finishing the compression stream.
154    pub fn into_inner(self) -> R {
155        self.inner.into_inner().into_inner()
156    }
157
158    /// Returns the number of bytes produced by the decompressor
159    /// (e.g. the number of bytes read from this stream)
160    ///
161    /// Note that, due to buffering, this only bears any relation to
162    /// total_in() when the decompressor reaches a sync point
163    /// (e.g. where the original compressed stream was flushed).
164    /// At that point, `total_in() / total_out()` is the compression ratio.
165    pub fn total_out(&self) -> u64 {
166        self.inner.total_out()
167    }
168
169    /// Returns the number of bytes consumed by the decompressor
170    /// (e.g. the number of bytes read from the underlying stream)
171    pub fn total_in(&self) -> u64 {
172        self.inner.total_in()
173    }
174}
175
176impl<R: Read> Read for XzDecoder<R> {
177    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
178        self.inner.read(buf)
179    }
180}
181
182#[cfg(feature = "tokio")]
183impl<R: AsyncRead + Read> AsyncRead for XzDecoder<R> {}
184
185impl<W: Write + Read> Write for XzDecoder<W> {
186    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
187        self.get_mut().write(buf)
188    }
189
190    fn flush(&mut self) -> io::Result<()> {
191        self.get_mut().flush()
192    }
193}
194
195#[cfg(feature = "tokio")]
196impl<R: AsyncWrite + Read> AsyncWrite for XzDecoder<R> {
197    fn shutdown(&mut self) -> Poll<(), io::Error> {
198        self.get_mut().shutdown()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use crate::read::{XzDecoder, XzEncoder};
205    use rand::{thread_rng, Rng};
206    use std::io::prelude::*;
207    use std::iter;
208
209    #[test]
210    fn smoke() {
211        let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
212        let mut c = XzEncoder::new(m, 6);
213        let mut data = vec![];
214        c.read_to_end(&mut data).unwrap();
215        let mut d = XzDecoder::new(&data[..]);
216        let mut data2 = Vec::new();
217        d.read_to_end(&mut data2).unwrap();
218        assert_eq!(data2, m);
219    }
220
221    #[test]
222    fn smoke2() {
223        let m: &[u8] = &[1, 2, 3, 4, 5, 6, 7, 8];
224        let c = XzEncoder::new(m, 6);
225        let mut d = XzDecoder::new(c);
226        let mut data = vec![];
227        d.read_to_end(&mut data).unwrap();
228        assert_eq!(data, [1, 2, 3, 4, 5, 6, 7, 8]);
229    }
230
231    #[test]
232    fn smoke3() {
233        let m = vec![3u8; 128 * 1024 + 1];
234        let c = XzEncoder::new(&m[..], 6);
235        let mut d = XzDecoder::new(c);
236        let mut data = vec![];
237        d.read_to_end(&mut data).unwrap();
238        assert!(data == &m[..]);
239    }
240
241    #[test]
242    fn self_terminating() {
243        let m = vec![3u8; 128 * 1024 + 1];
244        let mut c = XzEncoder::new(&m[..], 6);
245
246        let mut result = Vec::new();
247        c.read_to_end(&mut result).unwrap();
248
249        let mut rng = thread_rng();
250        let v = iter::repeat_with(|| rng.gen::<u8>())
251            .take(1024)
252            .collect::<Vec<_>>();
253        for _ in 0..200 {
254            result.extend(v.iter().map(|x| *x));
255        }
256
257        let mut d = XzDecoder::new(&result[..]);
258        let mut data = Vec::with_capacity(m.len());
259        unsafe {
260            data.set_len(m.len());
261        }
262        assert!(d.read(&mut data).unwrap() == m.len());
263        assert!(data == &m[..]);
264    }
265
266    #[test]
267    fn zero_length_read_at_eof() {
268        let m = Vec::new();
269        let mut c = XzEncoder::new(&m[..], 6);
270
271        let mut result = Vec::new();
272        c.read_to_end(&mut result).unwrap();
273
274        let mut d = XzDecoder::new(&result[..]);
275        let mut data = Vec::new();
276        assert!(d.read(&mut data).unwrap() == 0);
277    }
278
279    #[test]
280    fn zero_length_read_with_data() {
281        let m = vec![3u8; 128 * 1024 + 1];
282        let mut c = XzEncoder::new(&m[..], 6);
283
284        let mut result = Vec::new();
285        c.read_to_end(&mut result).unwrap();
286
287        let mut d = XzDecoder::new(&result[..]);
288        let mut data = Vec::new();
289        assert!(d.read(&mut data).unwrap() == 0);
290    }
291
292    #[test]
293    fn qc() {
294        ::quickcheck::quickcheck(test as fn(_) -> _);
295
296        fn test(v: Vec<u8>) -> bool {
297            let r = XzEncoder::new(&v[..], 6);
298            let mut r = XzDecoder::new(r);
299            let mut v2 = Vec::new();
300            r.read_to_end(&mut v2).unwrap();
301            v == v2
302        }
303    }
304
305    #[test]
306    fn two_streams() {
307        let mut input_stream1: Vec<u8> = Vec::new();
308        let mut input_stream2: Vec<u8> = Vec::new();
309        let mut all_input: Vec<u8> = Vec::new();
310
311        // Generate input data.
312        const STREAM1_SIZE: usize = 1024;
313        for num in 0..STREAM1_SIZE {
314            input_stream1.push(num as u8)
315        }
316        const STREAM2_SIZE: usize = 532;
317        for num in 0..STREAM2_SIZE {
318            input_stream2.push((num + 32) as u8)
319        }
320        all_input.extend(&input_stream1);
321        all_input.extend(&input_stream2);
322
323        // Make a vector with compressed data
324        let mut decoder_input = Vec::new();
325        {
326            let mut encoder = XzEncoder::new(&input_stream1[..], 6);
327            encoder.read_to_end(&mut decoder_input).unwrap();
328        }
329        {
330            let mut encoder = XzEncoder::new(&input_stream2[..], 6);
331            encoder.read_to_end(&mut decoder_input).unwrap();
332        }
333
334        // Decoder must be able to read the 2 concatenated xz streams and get the same data as input.
335        let mut decoder_reader = &decoder_input[..];
336        {
337            // using `XzDecoder::new` here would fail because only 1 xz stream would be processed.
338            let mut decoder = XzDecoder::new_multi_decoder(&mut decoder_reader);
339            let mut decompressed_data = vec![0u8; all_input.len()];
340
341            assert_eq!(
342                decoder.read(&mut decompressed_data).unwrap(),
343                all_input.len()
344            );
345            assert_eq!(decompressed_data, &all_input[..]);
346        }
347    }
348}