xz2/
write.rs

1//! Writer-based compression/decompression streams
2
3use lzma_sys;
4use std::io;
5use std::io::prelude::*;
6
7#[cfg(feature = "tokio")]
8use futures::Poll;
9#[cfg(feature = "tokio")]
10use tokio_io::{try_nb, AsyncRead, AsyncWrite};
11
12use crate::stream::{Action, Check, Status, Stream};
13
14/// A compression stream which will have uncompressed data written to it and
15/// will write compressed data to an output stream.
16pub struct XzEncoder<W: Write> {
17    data: Stream,
18    obj: Option<W>,
19    buf: Vec<u8>,
20}
21
22/// A compression stream which will have compressed data written to it and
23/// will write uncompressed data to an output stream.
24pub struct XzDecoder<W: Write> {
25    data: Stream,
26    obj: Option<W>,
27    buf: Vec<u8>,
28}
29
30impl<W: Write> XzEncoder<W> {
31    /// Create a new compression stream which will compress at the given level
32    /// to write compress output to the give output stream.
33    pub fn new(obj: W, level: u32) -> XzEncoder<W> {
34        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
35        XzEncoder::new_stream(obj, stream)
36    }
37
38    /// Create a new encoder which will use the specified `Stream` to encode
39    /// (compress) data into the provided `obj`.
40    pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
41        XzEncoder {
42            data: stream,
43            obj: Some(obj),
44            buf: Vec::with_capacity(32 * 1024),
45        }
46    }
47
48    /// Acquires a reference to the underlying writer.
49    pub fn get_ref(&self) -> &W {
50        self.obj.as_ref().unwrap()
51    }
52
53    /// Acquires a mutable reference to the underlying writer.
54    ///
55    /// Note that mutating the output/input state of the stream may corrupt this
56    /// object, so care must be taken when using this method.
57    pub fn get_mut(&mut self) -> &mut W {
58        self.obj.as_mut().unwrap()
59    }
60
61    fn dump(&mut self) -> io::Result<()> {
62        while self.buf.len() > 0 {
63            let n = self.obj.as_mut().unwrap().write(&self.buf)?;
64            self.buf.drain(..n);
65        }
66        Ok(())
67    }
68
69    /// Attempt to finish this output stream, writing out final chunks of data.
70    ///
71    /// Note that this function can only be used once data has finished being
72    /// written to the output stream. After this function is called then further
73    /// calls to `write` may result in a panic.
74    ///
75    /// # Panics
76    ///
77    /// Attempts to write data to this stream may result in a panic after this
78    /// function is called.
79    pub fn try_finish(&mut self) -> io::Result<()> {
80        loop {
81            self.dump()?;
82            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
83            if res == Status::StreamEnd {
84                break;
85            }
86        }
87        self.dump()
88    }
89
90    /// Consumes this encoder, flushing the output stream.
91    ///
92    /// This will flush the underlying data stream and then return the contained
93    /// writer if the flush succeeded.
94    ///
95    /// Note that this function may not be suitable to call in a situation where
96    /// the underlying stream is an asynchronous I/O stream. To finish a stream
97    /// the `try_finish` (or `shutdown`) method should be used instead. To
98    /// re-acquire ownership of a stream it is safe to call this method after
99    /// `try_finish` or `shutdown` has returned `Ok`.
100    pub fn finish(mut self) -> io::Result<W> {
101        self.try_finish()?;
102        Ok(self.obj.take().unwrap())
103    }
104
105    /// Returns the number of bytes produced by the compressor
106    ///
107    /// Note that, due to buffering, this only bears any relation to
108    /// `total_in()` after a call to `flush()`.  At that point,
109    /// `total_out() / total_in()` is the compression ratio.
110    pub fn total_out(&self) -> u64 {
111        self.data.total_out()
112    }
113
114    /// Returns the number of bytes consumed by the compressor
115    /// (e.g. the number of bytes written to this stream.)
116    pub fn total_in(&self) -> u64 {
117        self.data.total_in()
118    }
119}
120
121impl<W: Write> Write for XzEncoder<W> {
122    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
123        loop {
124            self.dump()?;
125
126            let total_in = self.total_in();
127            self.data
128                .process_vec(data, &mut self.buf, Action::Run)
129                .unwrap();
130            let written = (self.total_in() - total_in) as usize;
131
132            if written > 0 || data.len() == 0 {
133                return Ok(written);
134            }
135        }
136    }
137
138    fn flush(&mut self) -> io::Result<()> {
139        loop {
140            self.dump()?;
141            let status = self
142                .data
143                .process_vec(&[], &mut self.buf, Action::FullFlush)
144                .unwrap();
145            if status == Status::StreamEnd {
146                break;
147            }
148        }
149        self.obj.as_mut().unwrap().flush()
150    }
151}
152
153#[cfg(feature = "tokio")]
154impl<W: AsyncWrite> AsyncWrite for XzEncoder<W> {
155    fn shutdown(&mut self) -> Poll<(), io::Error> {
156        try_nb!(self.try_finish());
157        self.get_mut().shutdown()
158    }
159}
160
161impl<W: Read + Write> Read for XzEncoder<W> {
162    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
163        self.get_mut().read(buf)
164    }
165}
166
167#[cfg(feature = "tokio")]
168impl<W: AsyncRead + AsyncWrite> AsyncRead for XzEncoder<W> {}
169
170impl<W: Write> Drop for XzEncoder<W> {
171    fn drop(&mut self) {
172        if self.obj.is_some() {
173            let _ = self.try_finish();
174        }
175    }
176}
177
178impl<W: Write> XzDecoder<W> {
179    /// Creates a new decoding stream which will decode into `obj` one xz stream
180    /// from the input written to it.
181    pub fn new(obj: W) -> XzDecoder<W> {
182        let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
183        XzDecoder::new_stream(obj, stream)
184    }
185
186    /// Creates a new decoding stream which will decode into `obj` all the xz streams
187    /// from the input written to it.
188    pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
189        let stream =
190            Stream::new_stream_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
191        XzDecoder::new_stream(obj, stream)
192    }
193
194    /// Creates a new decoding stream which will decode all input written to it
195    /// into `obj`.
196    ///
197    /// A custom `stream` can be specified to configure what format this decoder
198    /// will recognize or configure other various decoding options.
199    pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
200        XzDecoder {
201            data: stream,
202            obj: Some(obj),
203            buf: Vec::with_capacity(32 * 1024),
204        }
205    }
206
207    /// Acquires a reference to the underlying writer.
208    pub fn get_ref(&self) -> &W {
209        self.obj.as_ref().unwrap()
210    }
211
212    /// Acquires a mutable reference to the underlying writer.
213    ///
214    /// Note that mutating the output/input state of the stream may corrupt this
215    /// object, so care must be taken when using this method.
216    pub fn get_mut(&mut self) -> &mut W {
217        self.obj.as_mut().unwrap()
218    }
219
220    fn dump(&mut self) -> io::Result<()> {
221        if self.buf.len() > 0 {
222            self.obj.as_mut().unwrap().write_all(&self.buf)?;
223            self.buf.truncate(0);
224        }
225        Ok(())
226    }
227
228    fn try_finish(&mut self) -> io::Result<()> {
229        loop {
230            self.dump()?;
231            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
232
233            // When decoding a truncated file, XZ returns LZMA_BUF_ERROR and
234            // decodes no new data, which corresponds to this crate's MemNeeded
235            // status.  Since we're finishing, we cannot provide more data so
236            // this is an error.
237            //
238            // See the 02_decompress.c example in xz-utils.
239            if self.buf.is_empty() && res == Status::MemNeeded {
240                let msg = "xz compressed stream is truncated or otherwise corrupt";
241                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
242            }
243
244            if res == Status::StreamEnd {
245                break;
246            }
247        }
248        self.dump()
249    }
250
251    /// Unwrap the underlying writer, finishing the compression stream.
252    pub fn finish(&mut self) -> io::Result<W> {
253        self.try_finish()?;
254        Ok(self.obj.take().unwrap())
255    }
256
257    /// Returns the number of bytes produced by the decompressor
258    ///
259    /// Note that, due to buffering, this only bears any relation to
260    /// `total_in()` after a call to `flush()`.  At that point,
261    /// `total_in() / total_out()` is the compression ratio.
262    pub fn total_out(&self) -> u64 {
263        self.data.total_out()
264    }
265
266    /// Returns the number of bytes consumed by the decompressor
267    /// (e.g. the number of bytes written to this stream.)
268    pub fn total_in(&self) -> u64 {
269        self.data.total_in()
270    }
271}
272
273impl<W: Write> Write for XzDecoder<W> {
274    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
275        loop {
276            self.dump()?;
277
278            let before = self.total_in();
279            let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
280            let written = (self.total_in() - before) as usize;
281
282            if written > 0 || data.len() == 0 || res == Status::StreamEnd {
283                return Ok(written);
284            }
285        }
286    }
287
288    fn flush(&mut self) -> io::Result<()> {
289        self.dump()?;
290        self.obj.as_mut().unwrap().flush()
291    }
292}
293
294#[cfg(feature = "tokio")]
295impl<W: AsyncWrite> AsyncWrite for XzDecoder<W> {
296    fn shutdown(&mut self) -> Poll<(), io::Error> {
297        try_nb!(self.try_finish());
298        self.get_mut().shutdown()
299    }
300}
301
302impl<W: Read + Write> Read for XzDecoder<W> {
303    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
304        self.get_mut().read(buf)
305    }
306}
307
308#[cfg(feature = "tokio")]
309impl<W: AsyncRead + AsyncWrite> AsyncRead for XzDecoder<W> {}
310
311impl<W: Write> Drop for XzDecoder<W> {
312    fn drop(&mut self) {
313        if self.obj.is_some() {
314            let _ = self.try_finish();
315        }
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::{XzDecoder, XzEncoder};
322    use std::io::prelude::*;
323    use std::iter::repeat;
324
325    #[test]
326    fn smoke() {
327        let d = XzDecoder::new(Vec::new());
328        let mut c = XzEncoder::new(d, 6);
329        c.write_all(b"12834").unwrap();
330        let s = repeat("12345").take(100000).collect::<String>();
331        c.write_all(s.as_bytes()).unwrap();
332        let data = c.finish().unwrap().finish().unwrap();
333        assert_eq!(&data[0..5], b"12834");
334        assert_eq!(data.len(), 500005);
335        assert!(format!("12834{}", s).as_bytes() == &*data);
336    }
337
338    #[test]
339    fn write_empty() {
340        let d = XzDecoder::new(Vec::new());
341        let mut c = XzEncoder::new(d, 6);
342        c.write(b"").unwrap();
343        let data = c.finish().unwrap().finish().unwrap();
344        assert_eq!(&data[..], b"");
345    }
346
347    #[test]
348    fn qc() {
349        ::quickcheck::quickcheck(test as fn(_) -> _);
350
351        fn test(v: Vec<u8>) -> bool {
352            let w = XzDecoder::new(Vec::new());
353            let mut w = XzEncoder::new(w, 6);
354            w.write_all(&v).unwrap();
355            v == w.finish().unwrap().finish().unwrap()
356        }
357    }
358}