hdrhistogram/serialization/
v2_serializer.rs

1use super::{Serializer, V2_COOKIE, V2_HEADER_SIZE};
2use crate::{Counter, Histogram};
3use byteorder::{BigEndian, WriteBytesExt};
4use std::io::{self, Write};
5use std::{error, fmt};
6
7/// Errors that occur during serialization.
8#[derive(Debug)]
9pub enum V2SerializeError {
10    /// A count above i64::max_value() cannot be zig-zag encoded, and therefore cannot be
11    /// serialized.
12    CountNotSerializable,
13    /// Internal calculations cannot be represented in `usize`. Use smaller histograms or beefier
14    /// hardware.
15    UsizeTypeTooSmall,
16    /// An i/o operation failed.
17    IoError(io::Error),
18}
19
20impl std::convert::From<std::io::Error> for V2SerializeError {
21    fn from(e: std::io::Error) -> Self {
22        V2SerializeError::IoError(e)
23    }
24}
25
26impl fmt::Display for V2SerializeError {
27    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
28        match self {
29            V2SerializeError::CountNotSerializable => write!(
30                f,
31                "A count above i64::max_value() cannot be zig-zag encoded"
32            ),
33            V2SerializeError::UsizeTypeTooSmall => {
34                write!(f, "Internal calculations cannot be represented in `usize`")
35            }
36            V2SerializeError::IoError(e) => write!(f, "An i/o operation failed: {}", e),
37        }
38    }
39}
40
41impl error::Error for V2SerializeError {
42    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
43        match self {
44            V2SerializeError::IoError(e) => Some(e),
45            _ => None,
46        }
47    }
48}
49
50/// Serializer for the V2 binary format.
51pub struct V2Serializer {
52    buf: Vec<u8>,
53}
54
55impl Default for V2Serializer {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60impl V2Serializer {
61    /// Create a new serializer.
62    pub fn new() -> V2Serializer {
63        V2Serializer { buf: Vec::new() }
64    }
65}
66
67impl Serializer for V2Serializer {
68    type SerializeError = V2SerializeError;
69
70    fn serialize<T: Counter, W: Write>(
71        &mut self,
72        h: &Histogram<T>,
73        writer: &mut W,
74    ) -> Result<usize, V2SerializeError> {
75        // TODO benchmark encoding directly into target Vec
76
77        self.buf.clear();
78        let max_size = max_encoded_size(h).ok_or(V2SerializeError::UsizeTypeTooSmall)?;
79        self.buf.reserve(max_size);
80
81        self.buf.write_u32::<BigEndian>(V2_COOKIE)?;
82        // placeholder for length
83        self.buf.write_u32::<BigEndian>(0)?;
84        // normalizing index offset
85        self.buf.write_u32::<BigEndian>(0)?;
86        self.buf
87            .write_u32::<BigEndian>(u32::from(h.significant_value_digits))?;
88        self.buf
89            .write_u64::<BigEndian>(h.lowest_discernible_value)?;
90        self.buf.write_u64::<BigEndian>(h.highest_trackable_value)?;
91        // int to double conversion
92        self.buf.write_f64::<BigEndian>(1.0)?;
93
94        debug_assert_eq!(V2_HEADER_SIZE, self.buf.len());
95
96        unsafe {
97            // want to treat the rest of the vec as a slice, and we've already reserved this
98            // space, so this way we don't have to resize() on a lot of dummy bytes.
99            self.buf.set_len(max_size);
100        }
101
102        let counts_len = encode_counts(h, &mut self.buf[V2_HEADER_SIZE..])?;
103        // addition should be safe as max_size is already a usize
104        let total_len = V2_HEADER_SIZE + counts_len;
105
106        // TODO benchmark fastest buffer management scheme
107        // counts is always under 2^24
108        (&mut self.buf[4..8]).write_u32::<BigEndian>(counts_len as u32)?;
109
110        writer
111            .write_all(&self.buf[0..(total_len)])
112            .map(|_| total_len)
113            .map_err(V2SerializeError::IoError)
114    }
115}
116
117fn max_encoded_size<T: Counter>(h: &Histogram<T>) -> Option<usize> {
118    h.index_for(h.max())
119        .and_then(|i| counts_array_max_encoded_size(i + 1))
120        .and_then(|x| x.checked_add(V2_HEADER_SIZE))
121}
122
123// Only public for testing.
124pub fn counts_array_max_encoded_size(length: usize) -> Option<usize> {
125    // LEB128-64b9B uses at most 9 bytes
126    // Won't overflow (except sometimes on 16 bit systems) because largest possible counts
127    // len is 47 buckets, each with 2^17 half count, for a total of 6e6. This product will
128    // therefore be about 5e7 (50 million) at most.
129    length.checked_mul(9)
130}
131
132// Only public for testing.
133/// Encode counts array into slice.
134/// The slice must be at least 9 * the number of counts that will be encoded.
135pub fn encode_counts<T: Counter>(
136    h: &Histogram<T>,
137    buf: &mut [u8],
138) -> Result<usize, V2SerializeError> {
139    let index_limit = h
140        .index_for(h.max())
141        .expect("Index for max value must exist");
142    let mut index = 0;
143    let mut bytes_written = 0;
144
145    assert!(index_limit <= h.counts.len());
146
147    while index <= index_limit {
148        // index is inside h.counts because of the assert above
149        let count = unsafe { *(h.counts.get_unchecked(index)) };
150        index += 1;
151
152        // Non-negative values are counts for the respective value, negative values are skipping
153        // that many (absolute value) zero-count values.
154
155        let mut zero_count = 0;
156        if count == T::zero() {
157            zero_count = 1;
158
159            // index is inside h.counts because of the assert above
160            while (index <= index_limit)
161                && (unsafe { *(h.counts.get_unchecked(index)) } == T::zero())
162            {
163                zero_count += 1;
164                index += 1;
165            }
166        }
167
168        let count_or_zeros: i64 = if zero_count > 1 {
169            // zero count can be at most the entire counts array, which is at most 2^24, so will
170            // fit.
171            -zero_count
172        } else {
173            // TODO while writing tests that serialize random counts, this was annoying.
174            // Don't want to silently cap them at i64::max_value() for users that, say, aren't
175            // serializing. Don't want to silently eat counts beyond i63 max when serializing.
176            // Perhaps we should provide some sort of pluggability here -- choose whether you want
177            // to truncate counts to i63 max, or report errors if you need maximum fidelity?
178            count
179                .to_i64()
180                .ok_or(V2SerializeError::CountNotSerializable)?
181        };
182
183        let zz = zig_zag_encode(count_or_zeros);
184
185        // this can't be longer than the length of `buf`, so this won't overflow `usize`
186        bytes_written += varint_write(zz, &mut buf[bytes_written..]);
187    }
188
189    Ok(bytes_written)
190}
191
192// Only public for testing.
193/// Write a number as a LEB128-64b9B little endian base 128 varint to buf. This is not
194/// quite the same as Protobuf's LEB128 as it encodes 64 bit values in a max of 9 bytes, not 10.
195/// The first 8 7-bit chunks are encoded normally (up through the first 7 bytes of input). The last
196/// byte is added to the buf as-is. This limits the input to 8 bytes, but that's all we need.
197/// Returns the number of bytes written (in [1, 9]).
198#[inline]
199pub fn varint_write(input: u64, buf: &mut [u8]) -> usize {
200    // The loop is unrolled because the special case is awkward to express in a loop, and it
201    // probably makes the branch predictor happier to do it this way.
202    // This way about twice as fast as the other "obvious" approach: a sequence of `if`s to detect
203    // size directly with each branch encoding that number completely and returning.
204
205    if shift_by_7s(input, 1) == 0 {
206        buf[0] = input as u8;
207        return 1;
208    }
209    // set high bit because more bytes are coming, then next 7 bits of value.
210    buf[0] = 0x80 | ((input & 0x7F) as u8);
211    if shift_by_7s(input, 2) == 0 {
212        // All zero above bottom 2 chunks, this is the last byte, so no high bit
213        buf[1] = shift_by_7s(input, 1) as u8;
214        return 2;
215    }
216    buf[1] = nth_7b_chunk_with_high_bit(input, 1);
217    if shift_by_7s(input, 3) == 0 {
218        buf[2] = shift_by_7s(input, 2) as u8;
219        return 3;
220    }
221    buf[2] = nth_7b_chunk_with_high_bit(input, 2);
222    if shift_by_7s(input, 4) == 0 {
223        buf[3] = shift_by_7s(input, 3) as u8;
224        return 4;
225    }
226    buf[3] = nth_7b_chunk_with_high_bit(input, 3);
227    if shift_by_7s(input, 5) == 0 {
228        buf[4] = shift_by_7s(input, 4) as u8;
229        return 5;
230    }
231    buf[4] = nth_7b_chunk_with_high_bit(input, 4);
232    if shift_by_7s(input, 6) == 0 {
233        buf[5] = shift_by_7s(input, 5) as u8;
234        return 6;
235    }
236    buf[5] = nth_7b_chunk_with_high_bit(input, 5);
237    if shift_by_7s(input, 7) == 0 {
238        buf[6] = shift_by_7s(input, 6) as u8;
239        return 7;
240    }
241    buf[6] = nth_7b_chunk_with_high_bit(input, 6);
242    if shift_by_7s(input, 8) == 0 {
243        buf[7] = shift_by_7s(input, 7) as u8;
244        return 8;
245    }
246    buf[7] = nth_7b_chunk_with_high_bit(input, 7);
247    // special case: write last whole byte as is
248    buf[8] = (input >> 56) as u8;
249    9
250}
251
252/// input: a u64
253/// n: >0, how many 7-bit shifts to do
254/// Returns the input shifted over by `n` groups of 7 bits.
255#[inline]
256fn shift_by_7s(input: u64, n: u8) -> u64 {
257    input >> (7 * n)
258}
259
260/// input: a u64
261/// n: >0, how many 7-bit shifts to do
262/// Returns the n'th chunk (starting from least significant) of 7 bits as a byte.
263/// The high bit in the byte will be set (not one of the 7 bits that map to input bits).
264#[inline]
265fn nth_7b_chunk_with_high_bit(input: u64, n: u8) -> u8 {
266    (shift_by_7s(input, n) as u8) | 0x80
267}
268
269// Only public for testing.
270/// Map signed numbers to unsigned: 0 to 0, -1 to 1, 1 to 2, -2 to 3, etc
271#[inline]
272pub fn zig_zag_encode(num: i64) -> u64 {
273    // If num < 0, num >> 63 is all 1 and vice versa.
274    ((num << 1) ^ (num >> 63)) as u64
275}