zstd/stream/zio/reader.rs
1use std::io::{self, BufRead, Read};
2
3use crate::stream::raw::{InBuffer, Operation, OutBuffer};
4
5// [ reader -> zstd ] -> output
6/// Implements the [`Read`] API around an [`Operation`].
7///
8/// This can be used to wrap a raw in-memory operation in a read-focused API.
9///
10/// It can wrap either a compression or decompression operation, and pulls
11/// input data from a wrapped `Read`.
12pub struct Reader<R, D> {
13 reader: R,
14 operation: D,
15
16 state: State,
17
18 single_frame: bool,
19 finished_frame: bool,
20}
21
22enum State {
23 // Still actively reading from the inner `Read`
24 Reading,
25 // We reached EOF from the inner `Read`, now flushing.
26 PastEof,
27 // We are fully done, nothing can be read.
28 Finished,
29}
30
31impl<R, D> Reader<R, D> {
32 /// Creates a new `Reader`.
33 ///
34 /// `reader` will be used to pull input data for the given operation.
35 pub fn new(reader: R, operation: D) -> Self {
36 Reader {
37 reader,
38 operation,
39 state: State::Reading,
40 single_frame: false,
41 finished_frame: false,
42 }
43 }
44
45 /// Sets `self` to stop after the first decoded frame.
46 pub fn set_single_frame(&mut self) {
47 self.single_frame = true;
48 }
49
50 /// Returns a mutable reference to the underlying operation.
51 pub fn operation_mut(&mut self) -> &mut D {
52 &mut self.operation
53 }
54
55 /// Returns a mutable reference to the underlying reader.
56 pub fn reader_mut(&mut self) -> &mut R {
57 &mut self.reader
58 }
59
60 /// Returns a reference to the underlying reader.
61 pub fn reader(&self) -> &R {
62 &self.reader
63 }
64
65 /// Returns the inner reader.
66 pub fn into_inner(self) -> R {
67 self.reader
68 }
69
70 /// Flush any internal buffer.
71 ///
72 /// For encoders, this ensures all input consumed so far is compressed.
73 pub fn flush(&mut self, output: &mut [u8]) -> io::Result<usize>
74 where
75 D: Operation,
76 {
77 self.operation.flush(&mut OutBuffer::around(output))
78 }
79}
80// Read and retry on Interrupted errors.
81fn fill_buf<R>(reader: &mut R) -> io::Result<&[u8]>
82where
83 R: BufRead,
84{
85 // This doesn't work right now because of the borrow-checker.
86 // When it can be made to compile, it would allow Reader to automatically
87 // retry on `Interrupted` error.
88 /*
89 loop {
90 match reader.fill_buf() {
91 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
92 otherwise => return otherwise,
93 }
94 }
95 */
96
97 // Workaround for now
98 let res = reader.fill_buf()?;
99
100 // eprintln!("Filled buffer: {:?}", res);
101
102 Ok(res)
103}
104
105impl<R, D> Read for Reader<R, D>
106where
107 R: BufRead,
108 D: Operation,
109{
110 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111 // Keep trying until _something_ has been written.
112 let mut first = true;
113 loop {
114 match self.state {
115 State::Reading => {
116 let (bytes_read, bytes_written) = {
117 // Start with a fresh pool of un-processed data.
118 // This is the only line that can return an interruption error.
119 let input = if first {
120 // eprintln!("First run, no input coming.");
121 b""
122 } else {
123 fill_buf(&mut self.reader)?
124 };
125
126 // eprintln!("Input = {:?}", input);
127
128 // It's possible we don't have any new data to read.
129 // (In this case we may still have zstd's own buffer to clear.)
130 if !first && input.is_empty() {
131 self.state = State::PastEof;
132 continue;
133 }
134 first = false;
135
136 let mut src = InBuffer::around(input);
137 let mut dst = OutBuffer::around(buf);
138
139 // We don't want empty input (from first=true) to cause a frame
140 // re-initialization.
141 if self.finished_frame && !input.is_empty() {
142 // eprintln!("!! Reigniting !!");
143 self.operation.reinit()?;
144 self.finished_frame = false;
145 }
146
147 // Phase 1: feed input to the operation
148 let hint = self.operation.run(&mut src, &mut dst)?;
149 // eprintln!(
150 // "Hint={} Just run our operation:\n In={:?}\n Out={:?}",
151 // hint, src, dst
152 // );
153
154 if hint == 0 {
155 // In practice this only happens when decoding, when we just finished
156 // reading a frame.
157 self.finished_frame = true;
158 if self.single_frame {
159 self.state = State::Finished;
160 }
161 }
162
163 // eprintln!("Output: {:?}", dst);
164
165 (src.pos(), dst.pos())
166 };
167
168 self.reader.consume(bytes_read);
169
170 if bytes_written > 0 {
171 return Ok(bytes_written);
172 }
173
174 // We need more data! Try again!
175 }
176 State::PastEof => {
177 let mut dst = OutBuffer::around(buf);
178
179 // We already sent all the input we could get to zstd. Time to flush out the
180 // buffer and be done with it.
181
182 // Phase 2: flush out the operation's buffer
183 // Keep calling `finish()` until the buffer is empty.
184 let hint = self
185 .operation
186 .finish(&mut dst, self.finished_frame)?;
187 // eprintln!("Hint: {} ; Output: {:?}", hint, dst);
188 if hint == 0 {
189 // This indicates that the footer is complete.
190 // This is the only way to terminate the stream cleanly.
191 self.state = State::Finished;
192 }
193
194 return Ok(dst.pos());
195 }
196 State::Finished => {
197 return Ok(0);
198 }
199 }
200 }
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::Reader;
207 use std::io::{Cursor, Read};
208
209 #[test]
210 fn test_noop() {
211 use crate::stream::raw::NoOp;
212
213 let input = b"AbcdefghAbcdefgh.";
214
215 // Test reader
216 let mut output = Vec::new();
217 {
218 let mut reader = Reader::new(Cursor::new(input), NoOp);
219 reader.read_to_end(&mut output).unwrap();
220 }
221 assert_eq!(&output, input);
222 }
223
224 #[test]
225 fn test_compress() {
226 use crate::stream::raw::Encoder;
227
228 let input = b"AbcdefghAbcdefgh.";
229
230 // Test reader
231 let mut output = Vec::new();
232 {
233 let mut reader =
234 Reader::new(Cursor::new(input), Encoder::new(1).unwrap());
235 reader.read_to_end(&mut output).unwrap();
236 }
237 // eprintln!("{:?}", output);
238 let decoded = crate::decode_all(&output[..]).unwrap();
239 assert_eq!(&decoded, input);
240 }
241}