lz4_flex/frame/
compress.rs

1use std::{
2    fmt,
3    hash::Hasher,
4    io::{self, Write},
5};
6use twox_hash::XxHash32;
7
8use crate::{
9    block::{
10        compress::compress_internal,
11        hashtable::{HashTable, HashTable4K},
12    },
13    sink::vec_sink_for_compression,
14};
15
16use super::Error;
17use super::{
18    header::{BlockInfo, BlockMode, FrameInfo, BLOCK_INFO_SIZE, MAX_FRAME_INFO_SIZE},
19    BlockSize,
20};
21use crate::block::WINDOW_SIZE;
22
23/// A writer for compressing a LZ4 stream.
24///
25/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
26/// Bytes written to this writer are compressed using the [LZ4 frame
27/// format](https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).
28///
29/// Writes are buffered automatically, so there's no need to wrap the given
30/// writer in a `std::io::BufWriter`.
31///
32/// To ensure a well formed stream the encoder must be finalized by calling
33/// either the [`finish()`], [`try_finish()`], or [`auto_finish()`] methods.
34///
35/// [`finish()`]: Self::finish
36/// [`try_finish()`]: Self::try_finish
37/// [`auto_finish()`]: Self::auto_finish
38///
39/// # Example 1
40/// Serializing json values into a compressed file.
41///
42/// ```no_run
43/// let compressed_file = std::fs::File::create("datafile").unwrap();
44/// let mut compressor = lz4_flex::frame::FrameEncoder::new(compressed_file);
45/// serde_json::to_writer(&mut compressor, &serde_json::json!({ "an": "object" })).unwrap();
46/// compressor.finish().unwrap();
47/// ```
48///
49/// # Example 2
50/// Serializing multiple json values into a compressed file using linked blocks.
51///
52/// ```no_run
53/// let compressed_file = std::fs::File::create("datafile").unwrap();
54/// let mut frame_info = lz4_flex::frame::FrameInfo::new();
55/// frame_info.block_mode = lz4_flex::frame::BlockMode::Linked;
56/// let mut compressor = lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, compressed_file);
57/// for i in 0..10u64 {
58///     serde_json::to_writer(&mut compressor, &serde_json::json!({ "i": i })).unwrap();
59/// }
60/// compressor.finish().unwrap();
61/// ```
62pub struct FrameEncoder<W: io::Write> {
63    /// Our buffer of uncompressed bytes.
64    src: Vec<u8>,
65    /// Index into src: starting point of bytes not yet compressed
66    src_start: usize,
67    /// Index into src: end point of bytes not not yet compressed
68    src_end: usize,
69    /// Index into src: starting point of external dictionary (applicable in Linked block mode)
70    ext_dict_offset: usize,
71    /// Length of external dictionary
72    ext_dict_len: usize,
73    /// Counter of bytes already compressed to the compression_table
74    /// _Not_ the same as `content_len` as this is reset every to 2GB.
75    src_stream_offset: usize,
76    /// Encoder table
77    compression_table: HashTable4K,
78    /// The underlying writer.
79    w: W,
80    /// Xxhash32 used when content checksum is enabled.
81    content_hasher: XxHash32,
82    /// Number of bytes compressed
83    content_len: u64,
84    /// The compressed bytes buffer. Bytes are compressed from src (usually)
85    /// to dst before being written to w.
86    dst: Vec<u8>,
87    /// Whether we have an open frame in the output.
88    is_frame_open: bool,
89    /// Whether we have an frame closed in the output.
90    data_to_frame_written: bool,
91    /// The frame information to be used in this encoder.
92    frame_info: FrameInfo,
93}
94
95impl<W: io::Write> FrameEncoder<W> {
96    fn init(&mut self) {
97        let max_block_size = self.frame_info.block_size.get_size();
98        let src_size = if self.frame_info.block_mode == BlockMode::Linked {
99            // In linked mode we consume the input (bumping src_start) but leave the
100            // beginning of src to be used as a prefix in subsequent blocks.
101            // That is at least until we have at least `max_block_size + WINDOW_SIZE`
102            // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
103            // and the input goes to the beginning of src again.
104            // Since we always want to be able to write a full block (up to max_block_size)
105            // we need a buffer with at least `max_block_size * 2 + WINDOW_SIZE` bytes.
106            max_block_size * 2 + WINDOW_SIZE
107        } else {
108            max_block_size
109        };
110        // Since this method is called potentially multiple times, don't reserve _additional_
111        // capacity if not required.
112        self.src
113            .reserve(src_size.saturating_sub(self.src.capacity()));
114        self.dst.reserve(
115            crate::block::compress::get_maximum_output_size(max_block_size)
116                .saturating_sub(self.dst.capacity()),
117        );
118    }
119
120    /// Returns a wrapper around `self` that will finish the stream on drop.
121    ///
122    /// # Note
123    /// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
124    /// [`try_finish()`] instead.
125    ///
126    /// [`finish()`]: Self::finish
127    /// [`try_finish()`]: Self::try_finish
128    pub fn auto_finish(self) -> AutoFinishEncoder<W> {
129        AutoFinishEncoder {
130            encoder: Some(self),
131        }
132    }
133
134    /// Creates a new Encoder with the specified FrameInfo.
135    pub fn with_frame_info(frame_info: FrameInfo, wtr: W) -> Self {
136        FrameEncoder {
137            src: Vec::new(),
138            w: wtr,
139            // 16 KB hash table for matches, same as the reference implementation.
140            compression_table: HashTable4K::new(),
141            content_hasher: XxHash32::with_seed(0),
142            content_len: 0,
143            dst: Vec::new(),
144            is_frame_open: false,
145            data_to_frame_written: false,
146            frame_info,
147            src_start: 0,
148            src_end: 0,
149            ext_dict_offset: 0,
150            ext_dict_len: 0,
151            src_stream_offset: 0,
152        }
153    }
154
155    /// Creates a new Encoder with the default settings.
156    pub fn new(wtr: W) -> Self {
157        Self::with_frame_info(Default::default(), wtr)
158    }
159
160    /// The frame information used by this Encoder.
161    pub fn frame_info(&mut self) -> &FrameInfo {
162        &self.frame_info
163    }
164
165    /// Consumes this encoder, flushing internal buffer and writing stream terminator.
166    pub fn finish(mut self) -> Result<W, Error> {
167        self.try_finish()?;
168        Ok(self.w)
169    }
170
171    /// Attempt to finish this output stream, flushing internal buffer and writing stream
172    /// terminator.
173    pub fn try_finish(&mut self) -> Result<(), Error> {
174        match self.flush() {
175            Ok(()) => {
176                // Empty input special case
177                // https://github.com/ouch-org/ouch/pull/163#discussion_r1108965151
178                if !self.is_frame_open && !self.data_to_frame_written {
179                    self.begin_frame(0)?;
180                }
181                self.end_frame()?;
182                self.data_to_frame_written = true;
183                Ok(())
184            }
185            Err(err) => Err(err.into()),
186        }
187    }
188
189    /// Returns the underlying writer _without_ flushing the stream.
190    /// This may leave the output in an unfinished state.
191    pub fn into_inner(self) -> W {
192        self.w
193    }
194
195    /// Gets a reference to the underlying writer in this encoder.
196    pub fn get_ref(&self) -> &W {
197        &self.w
198    }
199
200    /// Gets a reference to the underlying writer in this encoder.
201    ///
202    /// Note that mutating the output/input state of the stream may corrupt
203    /// this encoder, so care must be taken when using this method.
204    pub fn get_mut(&mut self) -> &mut W {
205        &mut self.w
206    }
207
208    /// Closes the frame by writing the end marker.
209    fn end_frame(&mut self) -> Result<(), Error> {
210        debug_assert!(self.is_frame_open);
211        self.is_frame_open = false;
212        if let Some(expected) = self.frame_info.content_size {
213            if expected != self.content_len {
214                return Err(Error::ContentLengthError {
215                    expected,
216                    actual: self.content_len,
217                });
218            }
219        }
220
221        let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
222        BlockInfo::EndMark.write(&mut block_info_buffer[..])?;
223        self.w.write_all(&block_info_buffer[..])?;
224        if self.frame_info.content_checksum {
225            let content_checksum = self.content_hasher.finish() as u32;
226            self.w.write_all(&content_checksum.to_le_bytes())?;
227        }
228
229        Ok(())
230    }
231
232    /// Begin the frame by writing the frame header.
233    /// It'll also setup the encoder for compressing blocks for the the new frame.
234    fn begin_frame(&mut self, buf_len: usize) -> io::Result<()> {
235        self.is_frame_open = true;
236        if self.frame_info.block_size == BlockSize::Auto {
237            self.frame_info.block_size = BlockSize::from_buf_length(buf_len);
238        }
239        self.init();
240        let mut frame_info_buffer = [0u8; MAX_FRAME_INFO_SIZE];
241        let size = self.frame_info.write(&mut frame_info_buffer)?;
242        self.w.write_all(&frame_info_buffer[..size])?;
243
244        if self.content_len != 0 {
245            // This is the second or later frame for this Encoder,
246            // reset compressor state for the new frame.
247            self.content_len = 0;
248            self.src_stream_offset = 0;
249            self.src.clear();
250            self.src_start = 0;
251            self.src_end = 0;
252            self.ext_dict_len = 0;
253            self.content_hasher = XxHash32::with_seed(0);
254            self.compression_table.clear();
255        }
256        Ok(())
257    }
258
259    /// Consumes the src contents between src_start and src_end,
260    /// which shouldn't exceed the max block size.
261    fn write_block(&mut self) -> io::Result<()> {
262        debug_assert!(self.is_frame_open);
263        let max_block_size = self.frame_info.block_size.get_size();
264        debug_assert!(self.src_end - self.src_start <= max_block_size);
265
266        // Reposition the compression table if we're anywhere near an overflowing hazard
267        if self.src_stream_offset + max_block_size + WINDOW_SIZE >= u32::MAX as usize / 2 {
268            self.compression_table
269                .reposition((self.src_stream_offset - self.ext_dict_len) as _);
270            self.src_stream_offset = self.ext_dict_len;
271        }
272
273        // input to the compressor, which may include a prefix when blocks are linked
274        let input = &self.src[..self.src_end];
275        // the contents of the block are between src_start and src_end
276        let src = &input[self.src_start..];
277
278        let dst_required_size = crate::block::compress::get_maximum_output_size(src.len());
279
280        let compress_result = if self.ext_dict_len != 0 {
281            debug_assert_eq!(self.frame_info.block_mode, BlockMode::Linked);
282            compress_internal::<_, true, _>(
283                input,
284                self.src_start,
285                &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
286                &mut self.compression_table,
287                &self.src[self.ext_dict_offset..self.ext_dict_offset + self.ext_dict_len],
288                self.src_stream_offset,
289            )
290        } else {
291            compress_internal::<_, false, _>(
292                input,
293                self.src_start,
294                &mut vec_sink_for_compression(&mut self.dst, 0, 0, dst_required_size),
295                &mut self.compression_table,
296                b"",
297                self.src_stream_offset,
298            )
299        };
300
301        let (block_info, block_data) = match compress_result.map_err(Error::CompressionError)? {
302            comp_len if comp_len < src.len() => {
303                (BlockInfo::Compressed(comp_len as _), &self.dst[..comp_len])
304            }
305            _ => (BlockInfo::Uncompressed(src.len() as _), src),
306        };
307
308        // Write the (un)compressed block to the writer and the block checksum (if applicable).
309        let mut block_info_buffer = [0u8; BLOCK_INFO_SIZE];
310        block_info.write(&mut block_info_buffer[..])?;
311        self.w.write_all(&block_info_buffer[..])?;
312        self.w.write_all(block_data)?;
313        if self.frame_info.block_checksums {
314            let mut block_hasher = XxHash32::with_seed(0);
315            block_hasher.write(block_data);
316            let block_checksum = block_hasher.finish() as u32;
317            self.w.write_all(&block_checksum.to_le_bytes())?;
318        }
319
320        // Content checksum, if applicable
321        if self.frame_info.content_checksum {
322            self.content_hasher.write(src);
323        }
324
325        // Buffer and offsets maintenance
326        self.content_len += src.len() as u64;
327        self.src_start += src.len();
328        debug_assert_eq!(self.src_start, self.src_end);
329        if self.frame_info.block_mode == BlockMode::Linked {
330            // In linked mode we consume the input (bumping src_start) but leave the
331            // beginning of src to be used as a prefix in subsequent blocks.
332            // That is at least until we have at least `max_block_size + WINDOW_SIZE`
333            // bytes in src, then we setup an ext_dict with the last WINDOW_SIZE bytes
334            // and the input goes to the beginning of src again.
335            debug_assert_eq!(self.src.capacity(), max_block_size * 2 + WINDOW_SIZE);
336            if self.src_start >= max_block_size + WINDOW_SIZE {
337                // The ext_dict will become the last WINDOW_SIZE bytes
338                self.ext_dict_offset = self.src_end - WINDOW_SIZE;
339                self.ext_dict_len = WINDOW_SIZE;
340                // Input goes in the beginning of the buffer again.
341                self.src_stream_offset += self.src_end;
342                self.src_start = 0;
343                self.src_end = 0;
344            } else if self.src_start + self.ext_dict_len > WINDOW_SIZE {
345                // There's more than WINDOW_SIZE bytes of lookback adding the prefix and ext_dict.
346                // Since we have a limited buffer we must shrink ext_dict in favor of the prefix,
347                // so that we can fit up to max_block_size bytes between dst_start and ext_dict
348                // start.
349                let delta = self
350                    .ext_dict_len
351                    .min(self.src_start + self.ext_dict_len - WINDOW_SIZE);
352                self.ext_dict_offset += delta;
353                self.ext_dict_len -= delta;
354                debug_assert!(self.src_start + self.ext_dict_len >= WINDOW_SIZE)
355            }
356            debug_assert!(
357                self.ext_dict_len == 0 || self.src_start + max_block_size <= self.ext_dict_offset
358            );
359        } else {
360            // In independent block mode we consume the entire src buffer
361            // which is sized equal to the frame max_block_size.
362            debug_assert_eq!(self.ext_dict_len, 0);
363            debug_assert_eq!(self.src.capacity(), max_block_size);
364            self.src_start = 0;
365            self.src_end = 0;
366            // Advance stream offset so we don't have to reset the match dict
367            // for the next block.
368            self.src_stream_offset += src.len();
369        }
370        debug_assert!(self.src_start <= self.src_end);
371        debug_assert!(self.src_start + max_block_size <= self.src.capacity());
372        Ok(())
373    }
374}
375
376impl<W: io::Write> io::Write for FrameEncoder<W> {
377    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
378        if !self.is_frame_open && !buf.is_empty() {
379            self.begin_frame(buf.len())?;
380        }
381        let buf_len = buf.len();
382        while !buf.is_empty() {
383            let src_filled = self.src_end - self.src_start;
384            let max_fill_len = self.frame_info.block_size.get_size() - src_filled;
385            if max_fill_len == 0 {
386                // make space by writing next block
387                self.write_block()?;
388                debug_assert_eq!(self.src_end, self.src_start);
389                continue;
390            }
391
392            let fill_len = max_fill_len.min(buf.len());
393            vec_copy_overwriting(&mut self.src, self.src_end, &buf[..fill_len]);
394            buf = &buf[fill_len..];
395            self.src_end += fill_len;
396        }
397        Ok(buf_len)
398    }
399
400    fn flush(&mut self) -> io::Result<()> {
401        if self.src_start != self.src_end {
402            self.write_block()?;
403        }
404        Ok(())
405    }
406}
407
408/// A wrapper around an [`FrameEncoder<W>`] that finishes the stream on drop.
409///
410/// This can be created by the [`auto_finish()`] method on the [`FrameEncoder<W>`].
411///
412/// # Note
413/// Errors on drop get silently ignored. If you want to handle errors then use [`finish()`] or
414/// [`try_finish()`] instead.
415///
416/// [`finish()`]: FrameEncoder::finish
417/// [`try_finish()`]: FrameEncoder::try_finish
418/// [`auto_finish()`]: FrameEncoder::auto_finish
419pub struct AutoFinishEncoder<W: Write> {
420    // We wrap this in an option to take it during drop.
421    encoder: Option<FrameEncoder<W>>,
422}
423
424impl<W: io::Write> Drop for AutoFinishEncoder<W> {
425    fn drop(&mut self) {
426        if let Some(mut encoder) = self.encoder.take() {
427            let _ = encoder.try_finish();
428        }
429    }
430}
431
432impl<W: Write> Write for AutoFinishEncoder<W> {
433    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
434        self.encoder.as_mut().unwrap().write(buf)
435    }
436
437    fn flush(&mut self) -> io::Result<()> {
438        self.encoder.as_mut().unwrap().flush()
439    }
440}
441
442impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
443    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
444        f.debug_struct("FrameEncoder")
445            .field("w", &self.w)
446            .field("frame_info", &self.frame_info)
447            .field("is_frame_open", &self.is_frame_open)
448            .field("content_hasher", &self.content_hasher)
449            .field("content_len", &self.content_len)
450            .field("dst", &"[...]")
451            .field("src", &"[...]")
452            .field("src_start", &self.src_start)
453            .field("src_end", &self.src_end)
454            .field("ext_dict_offset", &self.ext_dict_offset)
455            .field("ext_dict_len", &self.ext_dict_len)
456            .field("src_stream_offset", &self.src_stream_offset)
457            .finish()
458    }
459}
460
461/// Copy `src` into `target` starting from the `start` index, overwriting existing data if any.
462#[inline]
463fn vec_copy_overwriting(target: &mut Vec<u8>, target_start: usize, src: &[u8]) {
464    debug_assert!(target_start + src.len() <= target.capacity());
465
466    // By combining overwriting (copy_from_slice) and extending (extend_from_slice)
467    // we can fill the ring buffer without initializing it (eg. filling with 0).
468    let overwrite_len = (target.len() - target_start).min(src.len());
469    target[target_start..target_start + overwrite_len].copy_from_slice(&src[..overwrite_len]);
470    target.extend_from_slice(&src[overwrite_len..]);
471}