xz2/
bufread.rs

1//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3use lzma_sys;
4use std::io;
5use std::io::prelude::*;
6
7#[cfg(feature = "tokio")]
8use futures::Poll;
9#[cfg(feature = "tokio")]
10use tokio_io::{AsyncRead, AsyncWrite};
11
12use crate::stream::{Action, Check, Status, Stream};
13
14/// An xz encoder, or compressor.
15///
16/// This structure implements a `BufRead` interface and will read uncompressed
17/// data from an underlying stream and emit a stream of compressed data.
18pub struct XzEncoder<R> {
19    obj: R,
20    data: Stream,
21}
22
23/// A xz decoder, or decompressor.
24///
25/// This structure implements a `BufRead` interface and takes a stream of
26/// compressed data as input, providing the decompressed data when read from.
27pub struct XzDecoder<R> {
28    obj: R,
29    data: Stream,
30}
31
32impl<R: BufRead> XzEncoder<R> {
33    /// Creates a new encoder which will read uncompressed data from the given
34    /// stream and emit the compressed stream.
35    ///
36    /// The `level` argument here is typically 0-9 with 6 being a good default.
37    pub fn new(r: R, level: u32) -> XzEncoder<R> {
38        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
39        XzEncoder::new_stream(r, stream)
40    }
41
42    /// Creates a new encoder with a custom `Stream`.
43    ///
44    /// The `Stream` can be pre-configured for multithreaded encoding, different
45    /// compression options/tuning, etc.
46    pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
47        XzEncoder {
48            obj: r,
49            data: stream,
50        }
51    }
52}
53
54impl<R> XzEncoder<R> {
55    /// Acquires a reference to the underlying stream
56    pub fn get_ref(&self) -> &R {
57        &self.obj
58    }
59
60    /// Acquires a mutable reference to the underlying stream
61    ///
62    /// Note that mutation of the stream may result in surprising results if
63    /// this encoder is continued to be used.
64    pub fn get_mut(&mut self) -> &mut R {
65        &mut self.obj
66    }
67
68    /// Consumes this encoder, returning the underlying reader.
69    pub fn into_inner(self) -> R {
70        self.obj
71    }
72
73    /// Returns the number of bytes produced by the compressor
74    /// (e.g. the number of bytes read from this stream)
75    ///
76    /// Note that, due to buffering, this only bears any relation to
77    /// total_in() when the compressor chooses to flush its data
78    /// (unfortunately, this won't happen in general at the end of the
79    /// stream, because the compressor doesn't know if there's more data
80    /// to come).  At that point, `total_out() / total_in()` would be
81    /// the compression ratio.
82    pub fn total_out(&self) -> u64 {
83        self.data.total_out()
84    }
85
86    /// Returns the number of bytes consumed by the compressor
87    /// (e.g. the number of bytes read from the underlying stream)
88    pub fn total_in(&self) -> u64 {
89        self.data.total_in()
90    }
91}
92
93impl<R: BufRead> Read for XzEncoder<R> {
94    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
95        loop {
96            let (read, consumed, eof, ret);
97            {
98                let input = self.obj.fill_buf()?;
99                eof = input.is_empty();
100                let before_out = self.data.total_out();
101                let before_in = self.data.total_in();
102                let action = if eof { Action::Finish } else { Action::Run };
103                ret = self.data.process(input, buf, action);
104                read = (self.data.total_out() - before_out) as usize;
105                consumed = (self.data.total_in() - before_in) as usize;
106            }
107            self.obj.consume(consumed);
108
109            ret.unwrap();
110
111            // If we haven't ready any data and we haven't hit EOF yet, then we
112            // need to keep asking for more data because if we return that 0
113            // bytes of data have been read then it will be interpreted as EOF.
114            if read == 0 && !eof && buf.len() > 0 {
115                continue;
116            }
117            return Ok(read);
118        }
119    }
120}
121
122#[cfg(feature = "tokio")]
123impl<R: AsyncRead + BufRead> AsyncRead for XzEncoder<R> {}
124
125impl<W: Write> Write for XzEncoder<W> {
126    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
127        self.get_mut().write(buf)
128    }
129
130    fn flush(&mut self) -> io::Result<()> {
131        self.get_mut().flush()
132    }
133}
134
135#[cfg(feature = "tokio")]
136impl<R: AsyncWrite> AsyncWrite for XzEncoder<R> {
137    fn shutdown(&mut self) -> Poll<(), io::Error> {
138        self.get_mut().shutdown()
139    }
140}
141
142impl<R: BufRead> XzDecoder<R> {
143    /// Creates a new decoder which will decompress data read from the given
144    /// stream.
145    pub fn new(r: R) -> XzDecoder<R> {
146        let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
147        XzDecoder::new_stream(r, stream)
148    }
149
150    /// Creates a new decoder which will decompress data read from the given
151    /// input. All the concatenated xz streams from input will be consumed.
152    pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
153        let stream =
154            Stream::new_auto_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
155        XzDecoder::new_stream(r, stream)
156    }
157
158    /// Creates a new decoder with a custom `Stream`.
159    ///
160    /// The `Stream` can be pre-configured for various checks, different
161    /// decompression options/tuning, etc.
162    pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
163        XzDecoder {
164            obj: r,
165            data: stream,
166        }
167    }
168}
169
170impl<R> XzDecoder<R> {
171    /// Acquires a reference to the underlying stream
172    pub fn get_ref(&self) -> &R {
173        &self.obj
174    }
175
176    /// Acquires a mutable reference to the underlying stream
177    ///
178    /// Note that mutation of the stream may result in surprising results if
179    /// this encoder is continued to be used.
180    pub fn get_mut(&mut self) -> &mut R {
181        &mut self.obj
182    }
183
184    /// Consumes this decoder, returning the underlying reader.
185    pub fn into_inner(self) -> R {
186        self.obj
187    }
188
189    /// Returns the number of bytes that the decompressor has consumed.
190    ///
191    /// Note that this will likely be smaller than what the decompressor
192    /// actually read from the underlying stream due to buffering.
193    pub fn total_in(&self) -> u64 {
194        self.data.total_in()
195    }
196
197    /// Returns the number of bytes that the decompressor has produced.
198    pub fn total_out(&self) -> u64 {
199        self.data.total_out()
200    }
201}
202
203impl<R: BufRead> Read for XzDecoder<R> {
204    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
205        loop {
206            let (read, consumed, eof, ret);
207            {
208                let input = self.obj.fill_buf()?;
209                eof = input.is_empty();
210                let before_out = self.data.total_out();
211                let before_in = self.data.total_in();
212                ret = self
213                    .data
214                    .process(input, buf, if eof { Action::Finish } else { Action::Run });
215                read = (self.data.total_out() - before_out) as usize;
216                consumed = (self.data.total_in() - before_in) as usize;
217            }
218            self.obj.consume(consumed);
219
220            let status = ret?;
221            if read > 0 || eof || buf.len() == 0 {
222                if read == 0 && status != Status::StreamEnd && buf.len() > 0 {
223                    return Err(io::Error::new(
224                        io::ErrorKind::UnexpectedEof,
225                        "premature eof",
226                    ));
227                }
228                return Ok(read);
229            }
230            if consumed == 0 {
231                return Err(io::Error::new(
232                    io::ErrorKind::InvalidData,
233                    "corrupt xz stream",
234                ));
235            }
236        }
237    }
238}
239
240#[cfg(feature = "tokio")]
241impl<R: AsyncRead + BufRead> AsyncRead for XzDecoder<R> {}
242
243impl<W: Write> Write for XzDecoder<W> {
244    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
245        self.get_mut().write(buf)
246    }
247
248    fn flush(&mut self) -> io::Result<()> {
249        self.get_mut().flush()
250    }
251}
252
253#[cfg(feature = "tokio")]
254impl<R: AsyncWrite> AsyncWrite for XzDecoder<R> {
255    fn shutdown(&mut self) -> Poll<(), io::Error> {
256        self.get_mut().shutdown()
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use crate::bufread::{XzDecoder, XzEncoder};
263    use std::io::Read;
264
265    #[test]
266    fn compressed_and_trailing_data() {
267        // Make a vector with compressed data...
268        let mut to_compress: Vec<u8> = Vec::new();
269        const COMPRESSED_ORIG_SIZE: usize = 1024;
270        for num in 0..COMPRESSED_ORIG_SIZE {
271            to_compress.push(num as u8)
272        }
273        let mut encoder = XzEncoder::new(&to_compress[..], 6);
274
275        let mut decoder_input = Vec::new();
276        encoder.read_to_end(&mut decoder_input).unwrap();
277
278        // ...plus additional unrelated trailing data
279        const ADDITIONAL_SIZE: usize = 123;
280        let mut additional_data = Vec::new();
281        for num in 0..ADDITIONAL_SIZE {
282            additional_data.push(((25 + num) % 256) as u8)
283        }
284        decoder_input.extend(&additional_data);
285
286        // Decoder must be able to read the compressed xz stream, and keep the trailing data.
287        let mut decoder_reader = &decoder_input[..];
288        {
289            let mut decoder = XzDecoder::new(&mut decoder_reader);
290            let mut decompressed_data = vec![0u8; to_compress.len()];
291
292            assert_eq!(
293                decoder.read(&mut decompressed_data).unwrap(),
294                COMPRESSED_ORIG_SIZE
295            );
296            assert_eq!(decompressed_data, &to_compress[..]);
297        }
298
299        let mut remaining_data = Vec::new();
300        let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
301        assert_eq!(nb_read, ADDITIONAL_SIZE);
302        assert_eq!(remaining_data, &additional_data[..]);
303    }
304}