async_compression/codec/flate/
encoder.rs

1use crate::{codec::Encode, util::PartialBuffer};
2use std::io;
3
4use flate2::{Compress, Compression, FlushCompress, Status};
5
6#[derive(Debug)]
7pub struct FlateEncoder {
8    compress: Compress,
9    flushed: bool,
10}
11
12impl FlateEncoder {
13    pub(crate) fn new(level: Compression, zlib_header: bool) -> Self {
14        Self {
15            compress: Compress::new(level, zlib_header),
16            flushed: true,
17        }
18    }
19
20    pub(crate) fn get_ref(&self) -> &Compress {
21        &self.compress
22    }
23
24    fn encode(
25        &mut self,
26        input: &mut PartialBuffer<impl AsRef<[u8]>>,
27        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
28        flush: FlushCompress,
29    ) -> io::Result<Status> {
30        let prior_in = self.compress.total_in();
31        let prior_out = self.compress.total_out();
32
33        let status = self
34            .compress
35            .compress(input.unwritten(), output.unwritten_mut(), flush)?;
36
37        input.advance((self.compress.total_in() - prior_in) as usize);
38        output.advance((self.compress.total_out() - prior_out) as usize);
39
40        Ok(status)
41    }
42}
43
44impl Encode for FlateEncoder {
45    fn encode(
46        &mut self,
47        input: &mut PartialBuffer<impl AsRef<[u8]>>,
48        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
49    ) -> io::Result<()> {
50        self.flushed = false;
51        match self.encode(input, output, FlushCompress::None)? {
52            Status::Ok => Ok(()),
53            Status::StreamEnd => unreachable!(),
54            Status::BufError => Err(io::Error::new(io::ErrorKind::Other, "unexpected BufError")),
55        }
56    }
57
58    fn flush(
59        &mut self,
60        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
61    ) -> io::Result<bool> {
62        // We need to keep track of whether we've already flushed otherwise we'll just keep writing
63        // out sync blocks continuously and probably never complete flushing.
64        if self.flushed {
65            return Ok(true);
66        }
67
68        self.encode(
69            &mut PartialBuffer::new(&[][..]),
70            output,
71            FlushCompress::Sync,
72        )?;
73
74        loop {
75            let old_len = output.written().len();
76            self.encode(
77                &mut PartialBuffer::new(&[][..]),
78                output,
79                FlushCompress::None,
80            )?;
81            if output.written().len() == old_len {
82                break;
83            }
84        }
85
86        self.flushed = true;
87        Ok(!output.unwritten().is_empty())
88    }
89
90    fn finish(
91        &mut self,
92        output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
93    ) -> io::Result<bool> {
94        self.flushed = false;
95        match self.encode(
96            &mut PartialBuffer::new(&[][..]),
97            output,
98            FlushCompress::Finish,
99        )? {
100            Status::Ok => Ok(false),
101            Status::StreamEnd => Ok(true),
102            Status::BufError => Err(io::Error::new(io::ErrorKind::Other, "unexpected BufError")),
103        }
104    }
105}