async_compression/codec/bzip2/
decoder.rs

1use crate::{codec::Decode, util::PartialBuffer};
2use std::{fmt, io};
3
4use bzip2::{Decompress, Status};
5
6pub struct BzDecoder {
7    decompress: Decompress,
8}
9
10impl fmt::Debug for BzDecoder {
11    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
12        write!(
13            f,
14            "BzDecoder {{total_in: {}, total_out: {}}}",
15            self.decompress.total_in(),
16            self.decompress.total_out()
17        )
18    }
19}
20
21impl BzDecoder {
22    pub(crate) fn new() -> Self {
23        Self {
24            decompress: Decompress::new(false),
25        }
26    }
27
28    fn decode(
29        &mut self,
30        input: &mut PartialBuffer<impl AsRef<[u8]>>,
31        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
32    ) -> io::Result<Status> {
33        let prior_in = self.decompress.total_in();
34        let prior_out = self.decompress.total_out();
35
36        let status = self
37            .decompress
38            .decompress(input.unwritten(), output.unwritten_mut())
39            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
40
41        input.advance((self.decompress.total_in() - prior_in) as usize);
42        output.advance((self.decompress.total_out() - prior_out) as usize);
43
44        Ok(status)
45    }
46}
47
48impl Decode for BzDecoder {
49    fn reinit(&mut self) -> io::Result<()> {
50        self.decompress = Decompress::new(false);
51        Ok(())
52    }
53
54    fn decode(
55        &mut self,
56        input: &mut PartialBuffer<impl AsRef<[u8]>>,
57        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
58    ) -> io::Result<bool> {
59        match self.decode(input, output)? {
60            // Decompression went fine, nothing much to report.
61            Status::Ok => Ok(false),
62
63            // The Flush action on a compression went ok.
64            Status::FlushOk => unreachable!(),
65
66            // THe Run action on compression went ok.
67            Status::RunOk => unreachable!(),
68
69            // The Finish action on compression went ok.
70            Status::FinishOk => unreachable!(),
71
72            // The stream's end has been met, meaning that no more data can be input.
73            Status::StreamEnd => Ok(true),
74
75            // There was insufficient memory in the input or output buffer to complete
76            // the request, but otherwise everything went normally.
77            Status::MemNeeded => Err(io::Error::new(io::ErrorKind::Other, "out of memory")),
78        }
79    }
80
81    fn flush(
82        &mut self,
83        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
84    ) -> io::Result<bool> {
85        self.decode(&mut PartialBuffer::new(&[][..]), output)?;
86
87        loop {
88            let old_len = output.written().len();
89            self.decode(&mut PartialBuffer::new(&[][..]), output)?;
90            if output.written().len() == old_len {
91                break;
92            }
93        }
94
95        Ok(!output.unwritten().is_empty())
96    }
97
98    fn finish(
99        &mut self,
100        _output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
101    ) -> io::Result<bool> {
102        Ok(true)
103    }
104}