zstd/stream/zio/
reader.rs

1use std::io::{self, BufRead, Read};
2
3use crate::stream::raw::{InBuffer, Operation, OutBuffer};
4
5// [ reader -> zstd ] -> output
6/// Implements the [`Read`] API around an [`Operation`].
7///
8/// This can be used to wrap a raw in-memory operation in a read-focused API.
9///
10/// It can wrap either a compression or decompression operation, and pulls
11/// input data from a wrapped `Read`.
12pub struct Reader<R, D> {
13    reader: R,
14    operation: D,
15
16    state: State,
17
18    single_frame: bool,
19    finished_frame: bool,
20}
21
22enum State {
23    // Still actively reading from the inner `Read`
24    Reading,
25    // We reached EOF from the inner `Read`, now flushing.
26    PastEof,
27    // We are fully done, nothing can be read.
28    Finished,
29}
30
31impl<R, D> Reader<R, D> {
32    /// Creates a new `Reader`.
33    ///
34    /// `reader` will be used to pull input data for the given operation.
35    pub fn new(reader: R, operation: D) -> Self {
36        Reader {
37            reader,
38            operation,
39            state: State::Reading,
40            single_frame: false,
41            finished_frame: false,
42        }
43    }
44
45    /// Sets `self` to stop after the first decoded frame.
46    pub fn set_single_frame(&mut self) {
47        self.single_frame = true;
48    }
49
50    /// Returns a mutable reference to the underlying operation.
51    pub fn operation_mut(&mut self) -> &mut D {
52        &mut self.operation
53    }
54
55    /// Returns a mutable reference to the underlying reader.
56    pub fn reader_mut(&mut self) -> &mut R {
57        &mut self.reader
58    }
59
60    /// Returns a reference to the underlying reader.
61    pub fn reader(&self) -> &R {
62        &self.reader
63    }
64
65    /// Returns the inner reader.
66    pub fn into_inner(self) -> R {
67        self.reader
68    }
69
70    /// Flush any internal buffer.
71    ///
72    /// For encoders, this ensures all input consumed so far is compressed.
73    pub fn flush(&mut self, output: &mut [u8]) -> io::Result<usize>
74    where
75        D: Operation,
76    {
77        self.operation.flush(&mut OutBuffer::around(output))
78    }
79}
80// Read and retry on Interrupted errors.
81fn fill_buf<R>(reader: &mut R) -> io::Result<&[u8]>
82where
83    R: BufRead,
84{
85    // This doesn't work right now because of the borrow-checker.
86    // When it can be made to compile, it would allow Reader to automatically
87    // retry on `Interrupted` error.
88    /*
89    loop {
90        match reader.fill_buf() {
91            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
92            otherwise => return otherwise,
93        }
94    }
95    */
96
97    // Workaround for now
98    let res = reader.fill_buf()?;
99
100    // eprintln!("Filled buffer: {:?}", res);
101
102    Ok(res)
103}
104
105impl<R, D> Read for Reader<R, D>
106where
107    R: BufRead,
108    D: Operation,
109{
110    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111        // Keep trying until _something_ has been written.
112        let mut first = true;
113        loop {
114            match self.state {
115                State::Reading => {
116                    let (bytes_read, bytes_written) = {
117                        // Start with a fresh pool of un-processed data.
118                        // This is the only line that can return an interruption error.
119                        let input = if first {
120                            // eprintln!("First run, no input coming.");
121                            b""
122                        } else {
123                            fill_buf(&mut self.reader)?
124                        };
125
126                        // eprintln!("Input = {:?}", input);
127
128                        // It's possible we don't have any new data to read.
129                        // (In this case we may still have zstd's own buffer to clear.)
130                        if !first && input.is_empty() {
131                            self.state = State::PastEof;
132                            continue;
133                        }
134                        first = false;
135
136                        let mut src = InBuffer::around(input);
137                        let mut dst = OutBuffer::around(buf);
138
139                        // We don't want empty input (from first=true) to cause a frame
140                        // re-initialization.
141                        if self.finished_frame && !input.is_empty() {
142                            // eprintln!("!! Reigniting !!");
143                            self.operation.reinit()?;
144                            self.finished_frame = false;
145                        }
146
147                        // Phase 1: feed input to the operation
148                        let hint = self.operation.run(&mut src, &mut dst)?;
149                        // eprintln!(
150                        //     "Hint={} Just run our operation:\n In={:?}\n Out={:?}",
151                        //     hint, src, dst
152                        // );
153
154                        if hint == 0 {
155                            // In practice this only happens when decoding, when we just finished
156                            // reading a frame.
157                            self.finished_frame = true;
158                            if self.single_frame {
159                                self.state = State::Finished;
160                            }
161                        }
162
163                        // eprintln!("Output: {:?}", dst);
164
165                        (src.pos(), dst.pos())
166                    };
167
168                    self.reader.consume(bytes_read);
169
170                    if bytes_written > 0 {
171                        return Ok(bytes_written);
172                    }
173
174                    // We need more data! Try again!
175                }
176                State::PastEof => {
177                    let mut dst = OutBuffer::around(buf);
178
179                    // We already sent all the input we could get to zstd. Time to flush out the
180                    // buffer and be done with it.
181
182                    // Phase 2: flush out the operation's buffer
183                    // Keep calling `finish()` until the buffer is empty.
184                    let hint = self
185                        .operation
186                        .finish(&mut dst, self.finished_frame)?;
187                    // eprintln!("Hint: {} ; Output: {:?}", hint, dst);
188                    if hint == 0 {
189                        // This indicates that the footer is complete.
190                        // This is the only way to terminate the stream cleanly.
191                        self.state = State::Finished;
192                    }
193
194                    return Ok(dst.pos());
195                }
196                State::Finished => {
197                    return Ok(0);
198                }
199            }
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::Reader;
207    use std::io::{Cursor, Read};
208
209    #[test]
210    fn test_noop() {
211        use crate::stream::raw::NoOp;
212
213        let input = b"AbcdefghAbcdefgh.";
214
215        // Test reader
216        let mut output = Vec::new();
217        {
218            let mut reader = Reader::new(Cursor::new(input), NoOp);
219            reader.read_to_end(&mut output).unwrap();
220        }
221        assert_eq!(&output, input);
222    }
223
224    #[test]
225    fn test_compress() {
226        use crate::stream::raw::Encoder;
227
228        let input = b"AbcdefghAbcdefgh.";
229
230        // Test reader
231        let mut output = Vec::new();
232        {
233            let mut reader =
234                Reader::new(Cursor::new(input), Encoder::new(1).unwrap());
235            reader.read_to_end(&mut output).unwrap();
236        }
237        // eprintln!("{:?}", output);
238        let decoded = crate::decode_all(&output[..]).unwrap();
239        assert_eq!(&decoded, input);
240    }
241}