1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/*!
This module provides a `std::io::Write` implementation:

- `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
  compresses data encoded using the Snappy frame format. Use this if you have
  uncompressed data source and wish to write it as compressed data.

It would also be possible to provide a `write::FrameDecoder`, which decompresses
data as it writes it, but it hasn't been implemented yet.
*/

use std::fmt;
use std::io::{self, Write};

use crate::compress::Encoder;
use crate::crc32::CheckSummer;
pub use crate::error::IntoInnerError;
use crate::frame::{
    compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE,
    STREAM_IDENTIFIER,
};
use crate::MAX_BLOCK_SIZE;

/// A writer for compressing a Snappy stream.
///
/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
/// Bytes written to this writer are compressed using the [Snappy frame
/// format](https://github.com/google/snappy/blob/master/framing_format.txt)
/// (file extension `sz`, MIME type `application/x-snappy-framed`).
///
/// Writes are buffered automatically, so there's no need to wrap the given
/// writer in a `std::io::BufWriter`.
///
/// The writer will be flushed automatically when it is dropped. If an error
/// occurs, it is ignored.
pub struct FrameEncoder<W: io::Write> {
    /// Our main internal state, split out for borrowck reasons (happily paid).
    ///
    /// Also, it's an `Option` so we can move out of it even though
    /// `FrameEncoder` impls `Drop`.
    inner: Option<Inner<W>>,
    /// Our buffer of uncompressed bytes. This isn't part of `inner` because
    /// we may write bytes directly from the caller if the given buffer was
    /// big enough. As a result, the main `write` implementation needs to
    /// accept either the internal buffer or the caller's bytes directly. Since
    /// `write` requires a mutable borrow, we satisfy the borrow checker by
    /// separating `src` from the rest of the state.
    src: Vec<u8>,
}

struct Inner<W> {
    /// The underlying writer.
    w: W,
    /// An encoder that we reuse that does the actual block based compression.
    enc: Encoder,
    /// A CRC32 checksummer that is configured to either use the portable
    /// fallback version or the SSE4.2 accelerated version when the right CPU
    /// features are available.
    checksummer: CheckSummer,
    /// The compressed bytes buffer. Bytes are compressed from src (usually)
    /// to dst before being written to w.
    dst: Vec<u8>,
    /// When false, the stream identifier (with magic bytes) must precede the
    /// next write.
    wrote_stream_ident: bool,
    /// Space for writing the header of a chunk before writing it to the
    /// underlying writer.
    chunk_header: [u8; 8],
}

impl<W: io::Write> FrameEncoder<W> {
    /// Create a new writer for streaming Snappy compression.
    pub fn new(wtr: W) -> FrameEncoder<W> {
        FrameEncoder {
            inner: Some(Inner {
                w: wtr,
                enc: Encoder::new(),
                checksummer: CheckSummer::new(),
                dst: vec![0; MAX_COMPRESS_BLOCK_SIZE],
                wrote_stream_ident: false,
                chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE],
            }),
            src: Vec::with_capacity(MAX_BLOCK_SIZE),
        }
    }

    /// Returns the underlying stream, consuming and flushing this writer.
    ///
    /// If flushing the writer caused an error, then an `IntoInnerError` is
    /// returned, which contains both the writer and the original writer.
    pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> {
        match self.flush() {
            Ok(()) => Ok(self.inner.take().unwrap().w),
            Err(err) => Err(IntoInnerError::new(self, err)),
        }
    }

    /// Gets a reference to the underlying writer in this encoder.
    pub fn get_ref(&self) -> &W {
        &self.inner.as_ref().unwrap().w
    }

    /// Gets a reference to the underlying writer in this encoder.
    ///
    /// Note that mutating the output/input state of the stream may corrupt
    /// this encoder, so care must be taken when using this method.
    pub fn get_mut(&mut self) -> &mut W {
        &mut self.inner.as_mut().unwrap().w
    }
}

impl<W: io::Write> Drop for FrameEncoder<W> {
    fn drop(&mut self) {
        if self.inner.is_some() {
            // Ignore errors because we can't conceivably return an error and
            // panicing in a dtor is bad juju.
            let _ = self.flush();
        }
    }
}

impl<W: io::Write> io::Write for FrameEncoder<W> {
    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
        let mut total = 0;
        // If there isn't enough room to add buf to src, then add only a piece
        // of it, flush it and mush on.
        loop {
            let free = self.src.capacity() - self.src.len();
            // n is the number of bytes extracted from buf.
            let n = if buf.len() <= free {
                break;
            } else if self.src.is_empty() {
                // If buf is bigger than our entire buffer then avoid
                // the indirection and write the buffer directly.
                self.inner.as_mut().unwrap().write(buf)?
            } else {
                self.src.extend_from_slice(&buf[0..free]);
                self.flush()?;
                free
            };
            buf = &buf[n..];
            total += n;
        }
        // We're only here if buf.len() will fit within the available space of
        // self.src.
        debug_assert!(buf.len() <= (self.src.capacity() - self.src.len()));
        self.src.extend_from_slice(buf);
        total += buf.len();
        // We should never expand or contract self.src.
        debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE);
        Ok(total)
    }

    fn flush(&mut self) -> io::Result<()> {
        if self.src.is_empty() {
            return Ok(());
        }
        self.inner.as_mut().unwrap().write(&self.src)?;
        self.src.truncate(0);
        Ok(())
    }
}

impl<W: io::Write> Inner<W> {
    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
        let mut total = 0;
        if !self.wrote_stream_ident {
            self.wrote_stream_ident = true;
            self.w.write_all(STREAM_IDENTIFIER)?;
        }
        while !buf.is_empty() {
            // Advance buf and get our block.
            let mut src = buf;
            if src.len() > MAX_BLOCK_SIZE {
                src = &src[0..MAX_BLOCK_SIZE];
            }
            buf = &buf[src.len()..];

            let frame_data = compress_frame(
                &mut self.enc,
                self.checksummer,
                src,
                &mut self.chunk_header,
                &mut self.dst,
                false,
            )?;
            self.w.write_all(&self.chunk_header)?;
            self.w.write_all(frame_data)?;
            total += src.len();
        }
        Ok(total)
    }
}

impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("FrameEncoder")
            .field("inner", &self.inner)
            .field("src", &"[...]")
            .finish()
    }
}

impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        f.debug_struct("Inner")
            .field("w", &self.w)
            .field("enc", &self.enc)
            .field("checksummer", &self.checksummer)
            .field("dst", &"[...]")
            .field("wrote_stream_ident", &self.wrote_stream_ident)
            .field("chunk_header", &self.chunk_header)
            .finish()
    }
}