1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
/*!
This module provides a `std::io::Write` implementation:
- `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
compresses data encoded using the Snappy frame format. Use this if you have
uncompressed data source and wish to write it as compressed data.
It would also be possible to provide a `write::FrameDecoder`, which decompresses
data as it writes it, but it hasn't been implemented yet.
*/
use std::fmt;
use std::io::{self, Write};
use crate::compress::Encoder;
use crate::crc32::CheckSummer;
pub use crate::error::IntoInnerError;
use crate::frame::{
compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE,
STREAM_IDENTIFIER,
};
use crate::MAX_BLOCK_SIZE;
/// A writer for compressing a Snappy stream.
///
/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
/// Bytes written to this writer are compressed using the [Snappy frame
/// format](https://github.com/google/snappy/blob/master/framing_format.txt)
/// (file extension `sz`, MIME type `application/x-snappy-framed`).
///
/// Writes are buffered automatically, so there's no need to wrap the given
/// writer in a `std::io::BufWriter`.
///
/// The writer will be flushed automatically when it is dropped. If an error
/// occurs, it is ignored.
pub struct FrameEncoder<W: io::Write> {
/// Our main internal state, split out for borrowck reasons (happily paid).
///
/// Also, it's an `Option` so we can move out of it even though
/// `FrameEncoder` impls `Drop`.
inner: Option<Inner<W>>,
/// Our buffer of uncompressed bytes. This isn't part of `inner` because
/// we may write bytes directly from the caller if the given buffer was
/// big enough. As a result, the main `write` implementation needs to
/// accept either the internal buffer or the caller's bytes directly. Since
/// `write` requires a mutable borrow, we satisfy the borrow checker by
/// separating `src` from the rest of the state.
src: Vec<u8>,
}
struct Inner<W> {
/// The underlying writer.
w: W,
/// An encoder that we reuse that does the actual block based compression.
enc: Encoder,
/// A CRC32 checksummer that is configured to either use the portable
/// fallback version or the SSE4.2 accelerated version when the right CPU
/// features are available.
checksummer: CheckSummer,
/// The compressed bytes buffer. Bytes are compressed from src (usually)
/// to dst before being written to w.
dst: Vec<u8>,
/// When false, the stream identifier (with magic bytes) must precede the
/// next write.
wrote_stream_ident: bool,
/// Space for writing the header of a chunk before writing it to the
/// underlying writer.
chunk_header: [u8; 8],
}
impl<W: io::Write> FrameEncoder<W> {
/// Create a new writer for streaming Snappy compression.
pub fn new(wtr: W) -> FrameEncoder<W> {
FrameEncoder {
inner: Some(Inner {
w: wtr,
enc: Encoder::new(),
checksummer: CheckSummer::new(),
dst: vec![0; MAX_COMPRESS_BLOCK_SIZE],
wrote_stream_ident: false,
chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE],
}),
src: Vec::with_capacity(MAX_BLOCK_SIZE),
}
}
/// Returns the underlying stream, consuming and flushing this writer.
///
/// If flushing the writer caused an error, then an `IntoInnerError` is
/// returned, which contains both the writer and the original writer.
pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> {
match self.flush() {
Ok(()) => Ok(self.inner.take().unwrap().w),
Err(err) => Err(IntoInnerError::new(self, err)),
}
}
/// Gets a reference to the underlying writer in this encoder.
pub fn get_ref(&self) -> &W {
&self.inner.as_ref().unwrap().w
}
/// Gets a reference to the underlying writer in this encoder.
///
/// Note that mutating the output/input state of the stream may corrupt
/// this encoder, so care must be taken when using this method.
pub fn get_mut(&mut self) -> &mut W {
&mut self.inner.as_mut().unwrap().w
}
}
impl<W: io::Write> Drop for FrameEncoder<W> {
fn drop(&mut self) {
if self.inner.is_some() {
// Ignore errors because we can't conceivably return an error and
// panicing in a dtor is bad juju.
let _ = self.flush();
}
}
}
impl<W: io::Write> io::Write for FrameEncoder<W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let mut total = 0;
// If there isn't enough room to add buf to src, then add only a piece
// of it, flush it and mush on.
loop {
let free = self.src.capacity() - self.src.len();
// n is the number of bytes extracted from buf.
let n = if buf.len() <= free {
break;
} else if self.src.is_empty() {
// If buf is bigger than our entire buffer then avoid
// the indirection and write the buffer directly.
self.inner.as_mut().unwrap().write(buf)?
} else {
self.src.extend_from_slice(&buf[0..free]);
self.flush()?;
free
};
buf = &buf[n..];
total += n;
}
// We're only here if buf.len() will fit within the available space of
// self.src.
debug_assert!(buf.len() <= (self.src.capacity() - self.src.len()));
self.src.extend_from_slice(buf);
total += buf.len();
// We should never expand or contract self.src.
debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE);
Ok(total)
}
fn flush(&mut self) -> io::Result<()> {
if self.src.is_empty() {
return Ok(());
}
self.inner.as_mut().unwrap().write(&self.src)?;
self.src.truncate(0);
Ok(())
}
}
impl<W: io::Write> Inner<W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let mut total = 0;
if !self.wrote_stream_ident {
self.wrote_stream_ident = true;
self.w.write_all(STREAM_IDENTIFIER)?;
}
while !buf.is_empty() {
// Advance buf and get our block.
let mut src = buf;
if src.len() > MAX_BLOCK_SIZE {
src = &src[0..MAX_BLOCK_SIZE];
}
buf = &buf[src.len()..];
let frame_data = compress_frame(
&mut self.enc,
self.checksummer,
src,
&mut self.chunk_header,
&mut self.dst,
false,
)?;
self.w.write_all(&self.chunk_header)?;
self.w.write_all(frame_data)?;
total += src.len();
}
Ok(total)
}
}
impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FrameEncoder")
.field("inner", &self.inner)
.field("src", &"[...]")
.finish()
}
}
impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Inner")
.field("w", &self.w)
.field("enc", &self.enc)
.field("checksummer", &self.checksummer)
.field("dst", &"[...]")
.field("wrote_stream_ident", &self.wrote_stream_ident)
.field("chunk_header", &self.chunk_header)
.finish()
}
}