zstd/stream/write/mod.rs
1//! Implement push-based [`Write`] trait for both compressing and decompressing.
2use std::io::{self, Write};
3
4use zstd_safe;
5
6use crate::dict::{DecoderDictionary, EncoderDictionary};
7use crate::stream::{raw, zio};
8
9#[cfg(test)]
10mod tests;
11
12/// An encoder that compress and forward data to another writer.
13///
14/// This allows to compress a stream of data
15/// (good for files or heavy network stream).
16///
17/// Don't forget to call [`finish()`] before dropping it!
18///
19/// Alternatively, you can call [`auto_finish()`] to use an
20/// [`AutoFinishEncoder`] that will finish on drop.
21///
22/// Note: The zstd library has its own internal input buffer (~128kb).
23///
24/// [`finish()`]: #method.finish
25/// [`auto_finish()`]: #method.auto_finish
26/// [`AutoFinishEncoder`]: AutoFinishEncoder
27pub struct Encoder<'a, W: Write> {
28 // output writer (compressed data)
29 writer: zio::Writer<W, raw::Encoder<'a>>,
30}
31
32/// A decoder that decompress and forward data to another writer.
33///
34/// Note that you probably want to `flush()` after writing your stream content.
35/// You can use [`auto_flush()`] to automatically flush the writer on drop.
36///
37/// [`auto_flush()`]: Decoder::auto_flush
38pub struct Decoder<'a, W: Write> {
39 // output writer (decompressed data)
40 writer: zio::Writer<W, raw::Decoder<'a>>,
41}
42
43/// A wrapper around an `Encoder<W>` that finishes the stream on drop.
44///
45/// This can be created by the [`auto_finish()`] method on the [`Encoder`].
46///
47/// [`auto_finish()`]: Encoder::auto_finish
48/// [`Encoder`]: Encoder
49pub struct AutoFinishEncoder<
50 'a,
51 W: Write,
52 F: FnMut(io::Result<W>) = Box<dyn Send + FnMut(io::Result<W>)>,
53> {
54 // We wrap this in an option to take it during drop.
55 encoder: Option<Encoder<'a, W>>,
56
57 on_finish: Option<F>,
58}
59
60/// A wrapper around a `Decoder<W>` that flushes the stream on drop.
61///
62/// This can be created by the [`auto_flush()`] method on the [`Decoder`].
63///
64/// [`auto_flush()`]: Decoder::auto_flush
65/// [`Decoder`]: Decoder
66pub struct AutoFlushDecoder<
67 'a,
68 W: Write,
69 F: FnMut(io::Result<()>) = Box<dyn Send + FnMut(io::Result<()>)>,
70> {
71 // We wrap this in an option to take it during drop.
72 decoder: Option<Decoder<'a, W>>,
73
74 on_flush: Option<F>,
75}
76
77impl<'a, W: Write, F: FnMut(io::Result<()>)> AutoFlushDecoder<'a, W, F> {
78 fn new(decoder: Decoder<'a, W>, on_flush: F) -> Self {
79 AutoFlushDecoder {
80 decoder: Some(decoder),
81 on_flush: Some(on_flush),
82 }
83 }
84
85 /// Acquires a reference to the underlying writer.
86 pub fn get_ref(&self) -> &W {
87 self.decoder.as_ref().unwrap().get_ref()
88 }
89
90 /// Acquires a mutable reference to the underlying writer.
91 ///
92 /// Note that mutation of the writer may result in surprising results if
93 /// this decoder is continued to be used.
94 ///
95 /// Mostly used for testing purposes.
96 pub fn get_mut(&mut self) -> &mut W {
97 self.decoder.as_mut().unwrap().get_mut()
98 }
99}
100
101impl<W, F> Drop for AutoFlushDecoder<'_, W, F>
102where
103 W: Write,
104 F: FnMut(io::Result<()>),
105{
106 fn drop(&mut self) {
107 let mut decoder = self.decoder.take().unwrap();
108 let result = decoder.flush();
109 if let Some(mut on_finish) = self.on_flush.take() {
110 on_finish(result);
111 }
112 }
113}
114
115impl<W: Write, F: FnMut(io::Result<()>)> Write for AutoFlushDecoder<'_, W, F> {
116 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
117 self.decoder.as_mut().unwrap().write(buf)
118 }
119
120 fn flush(&mut self) -> io::Result<()> {
121 self.decoder.as_mut().unwrap().flush()
122 }
123}
124
125impl<'a, W: Write, F: FnMut(io::Result<W>)> AutoFinishEncoder<'a, W, F> {
126 fn new(encoder: Encoder<'a, W>, on_finish: F) -> Self {
127 AutoFinishEncoder {
128 encoder: Some(encoder),
129 on_finish: Some(on_finish),
130 }
131 }
132
133 /// Acquires a reference to the underlying writer.
134 pub fn get_ref(&self) -> &W {
135 self.encoder.as_ref().unwrap().get_ref()
136 }
137
138 /// Acquires a mutable reference to the underlying writer.
139 ///
140 /// Note that mutation of the writer may result in surprising results if
141 /// this encoder is continued to be used.
142 ///
143 /// Mostly used for testing purposes.
144 pub fn get_mut(&mut self) -> &mut W {
145 self.encoder.as_mut().unwrap().get_mut()
146 }
147}
148
149impl<W: Write, F: FnMut(io::Result<W>)> Drop for AutoFinishEncoder<'_, W, F> {
150 fn drop(&mut self) {
151 let result = self.encoder.take().unwrap().finish();
152 if let Some(mut on_finish) = self.on_finish.take() {
153 on_finish(result);
154 }
155 }
156}
157
158impl<W: Write, F: FnMut(io::Result<W>)> Write for AutoFinishEncoder<'_, W, F> {
159 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
160 self.encoder.as_mut().unwrap().write(buf)
161 }
162
163 fn flush(&mut self) -> io::Result<()> {
164 self.encoder.as_mut().unwrap().flush()
165 }
166}
167
168impl<W: Write> Encoder<'static, W> {
169 /// Creates a new encoder.
170 ///
171 /// `level`: compression level (1-22).
172 ///
173 /// A level of `0` uses zstd's default (currently `3`).
174 pub fn new(writer: W, level: i32) -> io::Result<Self> {
175 Self::with_dictionary(writer, level, &[])
176 }
177
178 /// Creates a new encoder, using an existing dictionary.
179 ///
180 /// (Provides better compression ratio for small files,
181 /// but requires the dictionary to be present during decompression.)
182 ///
183 /// A level of `0` uses zstd's default (currently `3`).
184 pub fn with_dictionary(
185 writer: W,
186 level: i32,
187 dictionary: &[u8],
188 ) -> io::Result<Self> {
189 let encoder = raw::Encoder::with_dictionary(level, dictionary)?;
190 Ok(Self::with_encoder(writer, encoder))
191 }
192}
193
194impl<'a, W: Write> Encoder<'a, W> {
195 /// Creates a new encoder from a prepared zio writer.
196 pub fn with_writer(writer: zio::Writer<W, raw::Encoder<'a>>) -> Self {
197 Self { writer }
198 }
199
200 /// Creates a new encoder from the given `Write` and raw encoder.
201 pub fn with_encoder(writer: W, encoder: raw::Encoder<'a>) -> Self {
202 let writer = zio::Writer::new(writer, encoder);
203 Self::with_writer(writer)
204 }
205
206 /// Creates an encoder that uses the provided context to compress a stream.
207 pub fn with_context(
208 writer: W,
209 context: &'a mut zstd_safe::CCtx<'static>,
210 ) -> Self {
211 let encoder = raw::Encoder::with_context(context);
212 Self::with_encoder(writer, encoder)
213 }
214
215 /// Creates a new encoder, using an existing prepared `EncoderDictionary`.
216 ///
217 /// (Provides better compression ratio for small files,
218 /// but requires the dictionary to be present during decompression.)
219 pub fn with_prepared_dictionary<'b>(
220 writer: W,
221 dictionary: &EncoderDictionary<'b>,
222 ) -> io::Result<Self>
223 where
224 'b: 'a,
225 {
226 let encoder = raw::Encoder::with_prepared_dictionary(dictionary)?;
227 Ok(Self::with_encoder(writer, encoder))
228 }
229
230 /// Creates a new encoder, using a ref prefix
231 pub fn with_ref_prefix<'b>(
232 writer: W,
233 level: i32,
234 ref_prefix: &'b [u8],
235 ) -> io::Result<Self>
236 where
237 'b: 'a,
238 {
239 let encoder = raw::Encoder::with_ref_prefix(level, ref_prefix)?;
240 Ok(Self::with_encoder(writer, encoder))
241 }
242
243 /// Returns a wrapper around `self` that will finish the stream on drop.
244 pub fn auto_finish(self) -> AutoFinishEncoder<'a, W> {
245 AutoFinishEncoder {
246 encoder: Some(self),
247 on_finish: None,
248 }
249 }
250
251 /// Returns an encoder that will finish the stream on drop.
252 ///
253 /// Calls the given callback with the result from `finish()`. This runs during drop so it's
254 /// important that the provided callback doesn't panic.
255 pub fn on_finish<F: FnMut(io::Result<W>)>(
256 self,
257 f: F,
258 ) -> AutoFinishEncoder<'a, W, F> {
259 AutoFinishEncoder::new(self, f)
260 }
261
262 /// Acquires a reference to the underlying writer.
263 pub fn get_ref(&self) -> &W {
264 self.writer.writer()
265 }
266
267 /// Acquires a mutable reference to the underlying writer.
268 ///
269 /// Note that mutation of the writer may result in surprising results if
270 /// this encoder is continued to be used.
271 pub fn get_mut(&mut self) -> &mut W {
272 self.writer.writer_mut()
273 }
274
275 /// **Required**: Finishes the stream.
276 ///
277 /// You *need* to finish the stream when you're done writing, either with
278 /// this method or with [`try_finish(self)`](#method.try_finish).
279 ///
280 /// This returns the inner writer in case you need it.
281 ///
282 /// To get back `self` in case an error happened, use `try_finish`.
283 ///
284 /// **Note**: If you don't want (or can't) call `finish()` manually after
285 /// writing your data, consider using `auto_finish()` to get an
286 /// `AutoFinishEncoder`.
287 pub fn finish(self) -> io::Result<W> {
288 self.try_finish().map_err(|(_, err)| err)
289 }
290
291 /// **Required**: Attempts to finish the stream.
292 ///
293 /// You *need* to finish the stream when you're done writing, either with
294 /// this method or with [`finish(self)`](#method.finish).
295 ///
296 /// This returns the inner writer if the finish was successful, or the
297 /// object plus an error if it wasn't.
298 ///
299 /// `write` on this object will panic after `try_finish` has been called,
300 /// even if it fails.
301 pub fn try_finish(mut self) -> Result<W, (Self, io::Error)> {
302 match self.writer.finish() {
303 // Return the writer, because why not
304 Ok(()) => Ok(self.writer.into_inner().0),
305 Err(e) => Err((self, e)),
306 }
307 }
308
309 /// Attempts to finish the stream.
310 ///
311 /// You *need* to finish the stream when you're done writing, either with
312 /// this method or with [`finish(self)`](#method.finish).
313 pub fn do_finish(&mut self) -> io::Result<()> {
314 self.writer.finish()
315 }
316
317 /// Return a recommendation for the size of data to write at once.
318 pub fn recommended_input_size() -> usize {
319 zstd_safe::CCtx::in_size()
320 }
321
322 crate::encoder_common!(writer);
323}
324
325impl<'a, W: Write> Write for Encoder<'a, W> {
326 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
327 self.writer.write(buf)
328 }
329
330 fn flush(&mut self) -> io::Result<()> {
331 self.writer.flush()
332 }
333}
334
335impl<W: Write> Decoder<'static, W> {
336 /// Creates a new decoder.
337 pub fn new(writer: W) -> io::Result<Self> {
338 Self::with_dictionary(writer, &[])
339 }
340
341 /// Creates a new decoder, using an existing dictionary.
342 ///
343 /// (Provides better compression ratio for small files,
344 /// but requires the dictionary to be present during decompression.)
345 pub fn with_dictionary(writer: W, dictionary: &[u8]) -> io::Result<Self> {
346 let decoder = raw::Decoder::with_dictionary(dictionary)?;
347 Ok(Self::with_decoder(writer, decoder))
348 }
349}
350
351impl<'a, W: Write> Decoder<'a, W> {
352 /// Creates a new decoder around the given prepared zio writer.
353 ///
354 /// # Examples
355 ///
356 /// ```rust
357 /// fn wrap<W: std::io::Write>(writer: W) -> zstd::stream::write::Decoder<'static, W> {
358 /// let decoder = zstd::stream::raw::Decoder::new().unwrap();
359 /// let writer = zstd::stream::zio::Writer::new(writer, decoder);
360 /// zstd::stream::write::Decoder::with_writer(writer)
361 /// }
362 /// ```
363 pub fn with_writer(writer: zio::Writer<W, raw::Decoder<'a>>) -> Self {
364 Decoder { writer }
365 }
366
367 /// Creates a new decoder around the given `Write` and raw decoder.
368 pub fn with_decoder(writer: W, decoder: raw::Decoder<'a>) -> Self {
369 let writer = zio::Writer::new(writer, decoder);
370 Decoder { writer }
371 }
372
373 /// Creates a new decoder, using an existing prepared `DecoderDictionary`.
374 ///
375 /// (Provides better compression ratio for small files,
376 /// but requires the dictionary to be present during decompression.)
377 pub fn with_prepared_dictionary<'b>(
378 writer: W,
379 dictionary: &DecoderDictionary<'b>,
380 ) -> io::Result<Self>
381 where
382 'b: 'a,
383 {
384 let decoder = raw::Decoder::with_prepared_dictionary(dictionary)?;
385 Ok(Self::with_decoder(writer, decoder))
386 }
387
388 /// Acquires a reference to the underlying writer.
389 pub fn get_ref(&self) -> &W {
390 self.writer.writer()
391 }
392
393 /// Acquires a mutable reference to the underlying writer.
394 ///
395 /// Note that mutation of the writer may result in surprising results if
396 /// this decoder is continued to be used.
397 pub fn get_mut(&mut self) -> &mut W {
398 self.writer.writer_mut()
399 }
400
401 /// Returns the inner `Write`.
402 pub fn into_inner(self) -> W {
403 self.writer.into_inner().0
404 }
405
406 /// Return a recommendation for the size of data to write at once.
407 pub fn recommended_input_size() -> usize {
408 zstd_safe::DCtx::in_size()
409 }
410
411 /// Returns a wrapper around `self` that will flush the stream on drop.
412 pub fn auto_flush(self) -> AutoFlushDecoder<'a, W> {
413 AutoFlushDecoder {
414 decoder: Some(self),
415 on_flush: None,
416 }
417 }
418
419 /// Returns a decoder that will flush the stream on drop.
420 ///
421 /// Calls the given callback with the result from `flush()`. This runs during drop so it's
422 /// important that the provided callback doesn't panic.
423 pub fn on_flush<F: FnMut(io::Result<()>)>(
424 self,
425 f: F,
426 ) -> AutoFlushDecoder<'a, W, F> {
427 AutoFlushDecoder::new(self, f)
428 }
429
430 crate::decoder_common!(writer);
431}
432
433impl<W: Write> Write for Decoder<'_, W> {
434 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
435 self.writer.write(buf)
436 }
437
438 fn flush(&mut self) -> io::Result<()> {
439 self.writer.flush()
440 }
441}
442
443fn _assert_traits() {
444 fn _assert_send<T: Send>(_: T) {}
445
446 _assert_send(Decoder::new(Vec::new()));
447 _assert_send(Encoder::new(Vec::new(), 1));
448 _assert_send(Decoder::new(Vec::new()).unwrap().auto_flush());
449 _assert_send(Encoder::new(Vec::new(), 1).unwrap().auto_finish());
450}