1use lzma_sys;
4use std::io;
5use std::io::prelude::*;
6
7#[cfg(feature = "tokio")]
8use futures::Poll;
9#[cfg(feature = "tokio")]
10use tokio_io::{try_nb, AsyncRead, AsyncWrite};
11
12use crate::stream::{Action, Check, Status, Stream};
13
14pub struct XzEncoder<W: Write> {
17 data: Stream,
18 obj: Option<W>,
19 buf: Vec<u8>,
20}
21
22pub struct XzDecoder<W: Write> {
25 data: Stream,
26 obj: Option<W>,
27 buf: Vec<u8>,
28}
29
30impl<W: Write> XzEncoder<W> {
31 pub fn new(obj: W, level: u32) -> XzEncoder<W> {
34 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
35 XzEncoder::new_stream(obj, stream)
36 }
37
38 pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
41 XzEncoder {
42 data: stream,
43 obj: Some(obj),
44 buf: Vec::with_capacity(32 * 1024),
45 }
46 }
47
48 pub fn get_ref(&self) -> &W {
50 self.obj.as_ref().unwrap()
51 }
52
53 pub fn get_mut(&mut self) -> &mut W {
58 self.obj.as_mut().unwrap()
59 }
60
61 fn dump(&mut self) -> io::Result<()> {
62 while self.buf.len() > 0 {
63 let n = self.obj.as_mut().unwrap().write(&self.buf)?;
64 self.buf.drain(..n);
65 }
66 Ok(())
67 }
68
69 pub fn try_finish(&mut self) -> io::Result<()> {
80 loop {
81 self.dump()?;
82 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
83 if res == Status::StreamEnd {
84 break;
85 }
86 }
87 self.dump()
88 }
89
90 pub fn finish(mut self) -> io::Result<W> {
101 self.try_finish()?;
102 Ok(self.obj.take().unwrap())
103 }
104
105 pub fn total_out(&self) -> u64 {
111 self.data.total_out()
112 }
113
114 pub fn total_in(&self) -> u64 {
117 self.data.total_in()
118 }
119}
120
121impl<W: Write> Write for XzEncoder<W> {
122 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
123 loop {
124 self.dump()?;
125
126 let total_in = self.total_in();
127 self.data
128 .process_vec(data, &mut self.buf, Action::Run)
129 .unwrap();
130 let written = (self.total_in() - total_in) as usize;
131
132 if written > 0 || data.len() == 0 {
133 return Ok(written);
134 }
135 }
136 }
137
138 fn flush(&mut self) -> io::Result<()> {
139 loop {
140 self.dump()?;
141 let status = self
142 .data
143 .process_vec(&[], &mut self.buf, Action::FullFlush)
144 .unwrap();
145 if status == Status::StreamEnd {
146 break;
147 }
148 }
149 self.obj.as_mut().unwrap().flush()
150 }
151}
152
153#[cfg(feature = "tokio")]
154impl<W: AsyncWrite> AsyncWrite for XzEncoder<W> {
155 fn shutdown(&mut self) -> Poll<(), io::Error> {
156 try_nb!(self.try_finish());
157 self.get_mut().shutdown()
158 }
159}
160
161impl<W: Read + Write> Read for XzEncoder<W> {
162 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
163 self.get_mut().read(buf)
164 }
165}
166
167#[cfg(feature = "tokio")]
168impl<W: AsyncRead + AsyncWrite> AsyncRead for XzEncoder<W> {}
169
170impl<W: Write> Drop for XzEncoder<W> {
171 fn drop(&mut self) {
172 if self.obj.is_some() {
173 let _ = self.try_finish();
174 }
175 }
176}
177
178impl<W: Write> XzDecoder<W> {
179 pub fn new(obj: W) -> XzDecoder<W> {
182 let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap();
183 XzDecoder::new_stream(obj, stream)
184 }
185
186 pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
189 let stream =
190 Stream::new_stream_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap();
191 XzDecoder::new_stream(obj, stream)
192 }
193
194 pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
200 XzDecoder {
201 data: stream,
202 obj: Some(obj),
203 buf: Vec::with_capacity(32 * 1024),
204 }
205 }
206
207 pub fn get_ref(&self) -> &W {
209 self.obj.as_ref().unwrap()
210 }
211
212 pub fn get_mut(&mut self) -> &mut W {
217 self.obj.as_mut().unwrap()
218 }
219
220 fn dump(&mut self) -> io::Result<()> {
221 if self.buf.len() > 0 {
222 self.obj.as_mut().unwrap().write_all(&self.buf)?;
223 self.buf.truncate(0);
224 }
225 Ok(())
226 }
227
228 fn try_finish(&mut self) -> io::Result<()> {
229 loop {
230 self.dump()?;
231 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
232
233 if self.buf.is_empty() && res == Status::MemNeeded {
240 let msg = "xz compressed stream is truncated or otherwise corrupt";
241 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
242 }
243
244 if res == Status::StreamEnd {
245 break;
246 }
247 }
248 self.dump()
249 }
250
251 pub fn finish(&mut self) -> io::Result<W> {
253 self.try_finish()?;
254 Ok(self.obj.take().unwrap())
255 }
256
257 pub fn total_out(&self) -> u64 {
263 self.data.total_out()
264 }
265
266 pub fn total_in(&self) -> u64 {
269 self.data.total_in()
270 }
271}
272
273impl<W: Write> Write for XzDecoder<W> {
274 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
275 loop {
276 self.dump()?;
277
278 let before = self.total_in();
279 let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
280 let written = (self.total_in() - before) as usize;
281
282 if written > 0 || data.len() == 0 || res == Status::StreamEnd {
283 return Ok(written);
284 }
285 }
286 }
287
288 fn flush(&mut self) -> io::Result<()> {
289 self.dump()?;
290 self.obj.as_mut().unwrap().flush()
291 }
292}
293
294#[cfg(feature = "tokio")]
295impl<W: AsyncWrite> AsyncWrite for XzDecoder<W> {
296 fn shutdown(&mut self) -> Poll<(), io::Error> {
297 try_nb!(self.try_finish());
298 self.get_mut().shutdown()
299 }
300}
301
302impl<W: Read + Write> Read for XzDecoder<W> {
303 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
304 self.get_mut().read(buf)
305 }
306}
307
308#[cfg(feature = "tokio")]
309impl<W: AsyncRead + AsyncWrite> AsyncRead for XzDecoder<W> {}
310
311impl<W: Write> Drop for XzDecoder<W> {
312 fn drop(&mut self) {
313 if self.obj.is_some() {
314 let _ = self.try_finish();
315 }
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::{XzDecoder, XzEncoder};
322 use std::io::prelude::*;
323 use std::iter::repeat;
324
325 #[test]
326 fn smoke() {
327 let d = XzDecoder::new(Vec::new());
328 let mut c = XzEncoder::new(d, 6);
329 c.write_all(b"12834").unwrap();
330 let s = repeat("12345").take(100000).collect::<String>();
331 c.write_all(s.as_bytes()).unwrap();
332 let data = c.finish().unwrap().finish().unwrap();
333 assert_eq!(&data[0..5], b"12834");
334 assert_eq!(data.len(), 500005);
335 assert!(format!("12834{}", s).as_bytes() == &*data);
336 }
337
338 #[test]
339 fn write_empty() {
340 let d = XzDecoder::new(Vec::new());
341 let mut c = XzEncoder::new(d, 6);
342 c.write(b"").unwrap();
343 let data = c.finish().unwrap().finish().unwrap();
344 assert_eq!(&data[..], b"");
345 }
346
347 #[test]
348 fn qc() {
349 ::quickcheck::quickcheck(test as fn(_) -> _);
350
351 fn test(v: Vec<u8>) -> bool {
352 let w = XzDecoder::new(Vec::new());
353 let mut w = XzEncoder::new(w, 6);
354 w.write_all(&v).unwrap();
355 v == w.finish().unwrap().finish().unwrap()
356 }
357 }
358}