hdrhistogram/serialization/
deserializer.rs

1use super::{V2_COMPRESSED_COOKIE, V2_COOKIE};
2use crate::{Counter, Histogram, RestatState};
3use byteorder::{BigEndian, ReadBytesExt};
4use flate2::read::ZlibDecoder;
5use num_traits::ToPrimitive;
6use std::io::{self, Cursor, Read};
7use std::marker::PhantomData;
8use std::{self, error, fmt};
9
10/// Errors that can happen during deserialization.
11#[derive(Debug)]
12pub enum DeserializeError {
13    /// An i/o operation failed.
14    IoError(io::Error),
15    /// The cookie (first 4 bytes) did not match that for any supported format.
16    InvalidCookie,
17    /// The histogram uses features that this implementation doesn't support (yet), so it cannot
18    /// be deserialized correctly.
19    UnsupportedFeature,
20    /// A count exceeded what can be represented in the chosen counter type.
21    UnsuitableCounterType,
22    /// The histogram instance could not be created because the serialized parameters were invalid
23    /// (e.g. lowest value, highest value, etc.)
24    InvalidParameters,
25    /// The current system's pointer width cannot represent the encoded histogram.
26    UsizeTypeTooSmall,
27    /// The encoded array is longer than it should be for the histogram's value range.
28    EncodedArrayTooLong,
29}
30
31impl std::convert::From<std::io::Error> for DeserializeError {
32    fn from(e: std::io::Error) -> Self {
33        DeserializeError::IoError(e)
34    }
35}
36
37impl fmt::Display for DeserializeError {
38    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39        match self {
40            DeserializeError::IoError(e) => write!(f, "An i/o operation failed: {}", e),
41            DeserializeError::InvalidCookie => write!(
42                f,
43                "The cookie (first 4 bytes) did not match that for any supported format"
44            ),
45            DeserializeError::UnsupportedFeature => write!(
46                f,
47                "The histogram uses features that this implementation doesn't support"
48            ),
49            DeserializeError::UnsuitableCounterType => write!(
50                f,
51                "A count exceeded what can be represented in the chosen counter type"
52            ),
53            DeserializeError::InvalidParameters => write!(
54                f,
55                "The serialized parameters were invalid(e.g. lowest value, highest value, etc)"
56            ),
57            DeserializeError::UsizeTypeTooSmall => write!(
58                f,
59                "The current system's pointer width cannot represent the encoded histogram"
60            ),
61            DeserializeError::EncodedArrayTooLong => write!(
62                f,
63                "The encoded array is longer than it should be for the histogram's value range"
64            ),
65        }
66    }
67}
68
69impl error::Error for DeserializeError {
70    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
71        match self {
72            DeserializeError::IoError(e) => Some(e),
73            _ => None,
74        }
75    }
76}
77
78/// Deserializer for all supported formats.
79///
80/// Since the serialization formats all include some magic bytes that allow reliable identification
81/// of the different formats, only one Deserializer implementation is needed.
82pub struct Deserializer {
83    payload_buf: Vec<u8>,
84}
85
86impl Default for Deserializer {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl Deserializer {
93    /// Create a new deserializer.
94    pub fn new() -> Deserializer {
95        Deserializer {
96            payload_buf: Vec::new(),
97        }
98    }
99
100    /// Deserialize an encoded histogram from the provided reader.
101    ///
102    /// Note that `&[u8]` and `Cursor` are convenient implementations of `Read` if you have some
103    /// bytes already in slice or `Vec` form.
104    pub fn deserialize<T: Counter, R: Read>(
105        &mut self,
106        reader: &mut R,
107    ) -> Result<Histogram<T>, DeserializeError> {
108        let cookie = reader.read_u32::<BigEndian>()?;
109
110        match cookie {
111            V2_COOKIE => self.deser_v2(reader),
112            V2_COMPRESSED_COOKIE => self.deser_v2_compressed(reader),
113            _ => Err(DeserializeError::InvalidCookie),
114        }
115    }
116
117    fn deser_v2_compressed<T: Counter, R: Read>(
118        &mut self,
119        reader: &mut R,
120    ) -> Result<Histogram<T>, DeserializeError> {
121        let payload_len = reader
122            .read_u32::<BigEndian>()?
123            .to_usize()
124            .ok_or(DeserializeError::UsizeTypeTooSmall)?;
125
126        // TODO reuse deflate buf, or switch to lower-level flate2::Decompress
127        let mut deflate_reader = ZlibDecoder::new(reader.take(payload_len as u64));
128        let inner_cookie = deflate_reader.read_u32::<BigEndian>()?;
129        if inner_cookie != V2_COOKIE {
130            return Err(DeserializeError::InvalidCookie);
131        }
132
133        self.deser_v2(&mut deflate_reader)
134    }
135
136    #[allow(clippy::float_cmp)]
137    fn deser_v2<T: Counter, R: Read>(
138        &mut self,
139        reader: &mut R,
140    ) -> Result<Histogram<T>, DeserializeError> {
141        let payload_len = reader
142            .read_u32::<BigEndian>()?
143            .to_usize()
144            .ok_or(DeserializeError::UsizeTypeTooSmall)?;
145        let normalizing_offset = reader.read_u32::<BigEndian>()?;
146        if normalizing_offset != 0 {
147            return Err(DeserializeError::UnsupportedFeature);
148        }
149        let num_digits = reader
150            .read_u32::<BigEndian>()?
151            .to_u8()
152            .ok_or(DeserializeError::InvalidParameters)?;
153        let low = reader.read_u64::<BigEndian>()?;
154        let high = reader.read_u64::<BigEndian>()?;
155        let int_double_ratio = reader.read_f64::<BigEndian>()?;
156        if int_double_ratio != 1.0 {
157            return Err(DeserializeError::UnsupportedFeature);
158        }
159
160        let mut h = Histogram::new_with_bounds(low, high, num_digits)
161            .map_err(|_| DeserializeError::InvalidParameters)?;
162
163        if payload_len > self.payload_buf.len() {
164            self.payload_buf.resize(payload_len, 0);
165        }
166
167        let mut payload_slice = &mut self.payload_buf[0..payload_len];
168        reader.read_exact(&mut payload_slice)?;
169
170        let mut payload_index: usize = 0;
171        let mut restat_state = RestatState::new();
172        let mut decode_state = DecodeLoopState::new();
173
174        while payload_index < payload_len.saturating_sub(9) {
175            // Read with fast loop until we are within 9 of the end. Fast loop can't handle EOF,
176            // so bail to slow version for the last few bytes.
177
178            // payload_index math is safe because payload_len is a usize
179            let (zz_num, bytes_read) =
180                varint_read_slice(&payload_slice[payload_index..(payload_index + 9)]);
181            payload_index += bytes_read;
182
183            let count_or_zeros = zig_zag_decode(zz_num);
184
185            decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
186        }
187
188        // Now read the leftovers
189        let leftover_slice = &payload_slice[payload_index..];
190        let mut cursor = Cursor::new(&leftover_slice);
191        while cursor.position() < leftover_slice.len() as u64 {
192            let count_or_zeros = zig_zag_decode(varint_read(&mut cursor)?);
193
194            decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
195        }
196
197        restat_state.update_histogram(&mut h);
198
199        Ok(h)
200    }
201}
202
203// Only public for testing.
204/// Read from a slice that must be 9 bytes long or longer. Returns the decoded number and how many
205/// bytes were consumed.
206#[inline]
207pub fn varint_read_slice(slice: &[u8]) -> (u64, usize) {
208    let mut b = slice[0];
209
210    // take low 7 bits
211    let mut value: u64 = low_7_bits(b);
212    if !is_high_bit_set(b) {
213        return (value, 1);
214    }
215    // high bit set, keep reading
216    b = slice[1];
217    value |= low_7_bits(b) << 7;
218    if !is_high_bit_set(b) {
219        return (value, 2);
220    }
221    b = slice[2];
222    value |= low_7_bits(b) << (7 * 2);
223    if !is_high_bit_set(b) {
224        return (value, 3);
225    }
226    b = slice[3];
227    value |= low_7_bits(b) << (7 * 3);
228    if !is_high_bit_set(b) {
229        return (value, 4);
230    }
231    b = slice[4];
232    value |= low_7_bits(b) << (7 * 4);
233    if !is_high_bit_set(b) {
234        return (value, 5);
235    }
236    b = slice[5];
237    value |= low_7_bits(b) << (7 * 5);
238    if !is_high_bit_set(b) {
239        return (value, 6);
240    }
241    b = slice[6];
242    value |= low_7_bits(b) << (7 * 6);
243    if !is_high_bit_set(b) {
244        return (value, 7);
245    }
246    b = slice[7];
247    value |= low_7_bits(b) << (7 * 7);
248    if !is_high_bit_set(b) {
249        return (value, 8);
250    }
251
252    b = slice[8];
253    // special case: use last byte as is
254    value |= u64::from(b) << (7 * 8);
255
256    (value, 9)
257}
258
259// Only public for testing.
260/// Read a LEB128-64b9B from the buffer
261pub fn varint_read<R: Read>(reader: &mut R) -> io::Result<u64> {
262    let mut b = reader.read_u8()?;
263
264    // take low 7 bits
265    let mut value: u64 = low_7_bits(b);
266
267    if is_high_bit_set(b) {
268        // high bit set, keep reading
269        b = reader.read_u8()?;
270        value |= low_7_bits(b) << 7;
271        if is_high_bit_set(b) {
272            b = reader.read_u8()?;
273            value |= low_7_bits(b) << (7 * 2);
274            if is_high_bit_set(b) {
275                b = reader.read_u8()?;
276                value |= low_7_bits(b) << (7 * 3);
277                if is_high_bit_set(b) {
278                    b = reader.read_u8()?;
279                    value |= low_7_bits(b) << (7 * 4);
280                    if is_high_bit_set(b) {
281                        b = reader.read_u8()?;
282                        value |= low_7_bits(b) << (7 * 5);
283                        if is_high_bit_set(b) {
284                            b = reader.read_u8()?;
285                            value |= low_7_bits(b) << (7 * 6);
286                            if is_high_bit_set(b) {
287                                b = reader.read_u8()?;
288                                value |= low_7_bits(b) << (7 * 7);
289                                if is_high_bit_set(b) {
290                                    b = reader.read_u8()?;
291                                    // special case: use last byte as is
292                                    value |= u64::from(b) << (7 * 8);
293                                }
294                            }
295                        }
296                    }
297                }
298            }
299        }
300    }
301
302    Ok(value)
303}
304
305/// truncate byte to low 7 bits, cast to u64
306#[inline]
307fn low_7_bits(b: u8) -> u64 {
308    u64::from(b & 0x7F)
309}
310
311#[inline]
312fn is_high_bit_set(b: u8) -> bool {
313    (b & 0x80) != 0
314}
315
316// Only public for testing.
317#[inline]
318pub fn zig_zag_decode(encoded: u64) -> i64 {
319    ((encoded >> 1) as i64) ^ -((encoded & 1) as i64)
320}
321
322/// We need to perform the same logic in two different decode loops while carrying over a modicum
323/// of state.
324struct DecodeLoopState<T: Counter> {
325    dest_index: usize,
326    phantom: PhantomData<T>,
327}
328
329impl<T: Counter> DecodeLoopState<T> {
330    fn new() -> DecodeLoopState<T> {
331        DecodeLoopState {
332            dest_index: 0,
333            phantom: PhantomData,
334        }
335    }
336
337    #[inline]
338    fn on_decoded_num(
339        &mut self,
340        count_or_zeros: i64,
341        restat_state: &mut RestatState<T>,
342        h: &mut Histogram<T>,
343    ) -> Result<(), DeserializeError> {
344        if count_or_zeros < 0 {
345            // For a valid histogram, negation won't overflow because you can't have anywhere close
346            // to even 2^32 array length
347            let zero_count = (-count_or_zeros)
348                .to_usize()
349                .ok_or(DeserializeError::UsizeTypeTooSmall)?;
350            // skip the zeros
351            self.dest_index = self
352                .dest_index
353                .checked_add(zero_count)
354                .ok_or(DeserializeError::UsizeTypeTooSmall)?;
355        } else {
356            let count: T =
357                T::from_i64(count_or_zeros).ok_or(DeserializeError::UnsuitableCounterType)?;
358
359            if count > T::zero() {
360                h.set_count_at_index(self.dest_index, count)
361                    .map_err(|_| DeserializeError::EncodedArrayTooLong)?;
362
363                restat_state.on_nonzero_count(self.dest_index, count);
364            }
365
366            self.dest_index = self
367                .dest_index
368                .checked_add(1)
369                .ok_or(DeserializeError::UsizeTypeTooSmall)?;
370        }
371
372        Ok(())
373    }
374}