differential_dataflow/trace/implementations/
huffman_container.rs

1//! A slice container that Huffman encodes its contents.
2
3use std::collections::BTreeMap;
4use timely::container::PushInto;
5
6use crate::trace::implementations::{BatchContainer, OffsetList};
7
8use self::wrapper::Wrapped;
9use self::encoded::Encoded;
10use self::huffman::Huffman;
11
12/// A container that contains slices `[B]` as items.
13pub struct HuffmanContainer<B: Ord+Clone> {
14    /// Either encoded data or raw data.
15    inner: Result<(Huffman<B>, Vec<u8>), Vec<B>>,
16    /// Offsets that bound each contained slice.
17    ///
18    /// The length will be one greater than the number of contained items.
19    offsets: OffsetList,
20    /// Counts of the number of each pattern we've seen.
21    stats: BTreeMap<B, i64>
22}
23
24impl<B: Ord + Clone> HuffmanContainer<B> {
25    /// Prints statistics about encoded containers.
26    pub fn print(&self) {
27        if let Ok((_huff, bytes)) = &self.inner {
28            println!("Bytes: {:?}, Symbols: {:?}", bytes.len(), self.stats.values().sum::<i64>());
29        }
30    }
31}
32
33impl<'a, B: Ord + Clone + 'static> PushInto<&'a Vec<B>> for HuffmanContainer<B> {
34    fn push_into(&mut self, item: &'a Vec<B>) {
35        for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
36        match &mut self.inner {
37            Ok((huffman, bytes)) => {
38                bytes.extend(huffman.encode(item.iter()));
39                self.offsets.push(bytes.len());
40            },
41            Err(raw) => {
42                raw.extend(item.iter().cloned());
43                self.offsets.push(raw.len());
44            }
45        }
46    }
47}
48
49impl<'a, B: Ord + Clone + 'static> PushInto<Wrapped<'a, B>> for HuffmanContainer<B> {
50    fn push_into(&mut self, item: Wrapped<'a, B>) {
51        match item.decode() {
52            Ok(decoded) => {
53                for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; }
54
55            },
56            Err(symbols) => {
57                for x in symbols.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
58            }
59        }
60        match (item.decode(), &mut self.inner) {
61            (Ok(decoded), Ok((huffman, bytes))) => {
62                bytes.extend(huffman.encode(decoded));
63                self.offsets.push(bytes.len());
64            }
65            (Ok(decoded), Err(raw)) => {
66                raw.extend(decoded.cloned());
67                self.offsets.push(raw.len());
68            }
69            (Err(symbols), Ok((huffman, bytes))) => {
70                bytes.extend(huffman.encode(symbols.iter()));
71                self.offsets.push(bytes.len());
72            }
73            (Err(symbols), Err(raw)) => {
74                raw.extend(symbols.iter().cloned());
75                self.offsets.push(raw.len());
76            }
77        }
78    }
79}
80
81impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
82    type Owned = Vec<B>;
83    type ReadItem<'a> = Wrapped<'a, B>;
84
85    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
86        match item.decode() {
87            Ok(decode) => decode.cloned().collect(),
88            Err(bytes) => bytes.to_vec(),
89        }
90    }
91    fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
92        other.clear();
93        match item.decode() {
94            Ok(decode) => other.extend(decode.cloned()),
95            Err(bytes) => other.extend_from_slice(bytes),
96        }
97    }
98    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
99
100    fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
101    fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
102
103    fn clear(&mut self) { *self = Self::default(); }
104
105    fn with_capacity(size: usize) -> Self {
106        let mut offsets = OffsetList::with_capacity(size + 1);
107        offsets.push(0);
108        Self {
109            inner: Err(Vec::with_capacity(size)),
110            offsets,
111            stats: Default::default(),
112        }
113    }
114    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
115
116        if cont1.len() > 0 { cont1.print(); }
117        if cont2.len() > 0 { cont2.print(); }
118
119        let mut counts = BTreeMap::default();
120        for (symbol, count) in cont1.stats.iter() {
121            *counts.entry(symbol.clone()).or_insert(0) += count;
122        }
123        for (symbol, count) in cont2.stats.iter() {
124            *counts.entry(symbol.clone()).or_insert(0) += count;
125        }
126
127        let bytes = Vec::with_capacity(counts.values().cloned().sum::<i64>() as usize);
128        let huffman = Huffman::create_from(counts);
129        let inner = Ok((huffman, bytes));
130        // : Err(Vec::with_capacity(length))
131
132        let length = cont1.offsets.len() + cont2.offsets.len() - 2;
133        let mut offsets = OffsetList::with_capacity(length + 1);
134        offsets.push(0);
135        Self {
136            inner,
137            offsets,
138            stats: Default::default(),
139        }
140    }
141    fn index(&self, index: usize) -> Self::ReadItem<'_> {
142        let lower = self.offsets.index(index);
143        let upper = self.offsets.index(index+1);
144        match &self.inner {
145            Ok((huffman, bytes)) => Wrapped::encoded(Encoded::new(huffman, &bytes[lower .. upper])),
146            Err(raw) => Wrapped::decoded(&raw[lower .. upper]),
147        }
148    }
149    fn len(&self) -> usize {
150        self.offsets.len() - 1
151    }
152}
153/// Default implementation introduces a first offset.
154impl<B: Ord+Clone> Default for HuffmanContainer<B> {
155    fn default() -> Self {
156        let mut offsets = OffsetList::with_capacity(1);
157        offsets.push(0);
158        Self {
159            inner: Err(Vec::new()),
160            offsets,
161            stats: Default::default(),
162        }
163    }
164}
165
166mod wrapper {
167
168    use super::Encoded;
169
170    pub struct Wrapped<'a, B: Ord> {
171        pub(crate) inner: Result<Encoded<'a, B>, &'a [B]>,
172    }
173
174    impl<'a, B: Ord> Wrapped<'a, B> {
175        /// Returns either a decoding iterator, or just the bytes themselves.
176        pub fn decode(&'a self) -> Result<impl Iterator<Item=&'a B> + 'a, &'a [B]> {
177            match &self.inner {
178                Ok(encoded) => Ok(encoded.decode()),
179                Err(symbols) => Err(symbols),
180            }
181        }
182        /// A wrapper around an encoded sequence.
183        pub fn encoded(e: Encoded<'a, B>) -> Self { Self { inner: Ok(e) } }
184        /// A wrapper around a decoded sequence.
185        pub fn decoded(d: &'a [B]) -> Self { Self { inner: Err(d) } }
186    }
187
188    impl<'a, B: Ord> Copy for Wrapped<'a, B> { }
189    impl<'a, B: Ord> Clone for Wrapped<'a, B> {
190        fn clone(&self) -> Self { *self }
191    }
192
193    use std::cmp::Ordering;
194    impl<'a, 'b, B: Ord> PartialEq<Wrapped<'a, B>> for Wrapped<'b, B> {
195        fn eq(&self, other: &Wrapped<'a, B>) -> bool {
196            match (self.decode(), other.decode()) {
197                (Ok(decode1), Ok(decode2)) => decode1.eq(decode2),
198                (Ok(decode1), Err(bytes2)) => decode1.eq(bytes2.iter()),
199                (Err(bytes1), Ok(decode2)) => bytes1.iter().eq(decode2),
200                (Err(bytes1), Err(bytes2)) => bytes1.eq(bytes2),
201            }
202        }
203    }
204    impl<'a, B: Ord> Eq for Wrapped<'a, B> { }
205    impl<'a, 'b, B: Ord> PartialOrd<Wrapped<'a, B>> for Wrapped<'b, B> {
206        fn partial_cmp(&self, other: &Wrapped<'a, B>) -> Option<Ordering> {
207            match (self.decode(), other.decode()) {
208                (Ok(decode1), Ok(decode2)) => decode1.partial_cmp(decode2),
209                (Ok(decode1), Err(bytes2)) => decode1.partial_cmp(bytes2.iter()),
210                (Err(bytes1), Ok(decode2)) => bytes1.iter().partial_cmp(decode2),
211                (Err(bytes1), Err(bytes2)) => bytes1.partial_cmp(bytes2),
212            }
213        }
214    }
215    impl<'a, B: Ord> Ord for Wrapped<'a, B> {
216        fn cmp(&self, other: &Self) -> Ordering {
217            self.partial_cmp(other).unwrap()
218        }
219    }
220}
221
222/// Wrapper around a Huffman decoder and byte slices, decodeable to a byte sequence.
223mod encoded {
224
225    use super::Huffman;
226
227    /// Welcome to GATs!
228    pub struct Encoded<'a, B: Ord> {
229        /// Text that decorates the data.
230        huffman: &'a Huffman<B>,
231        /// The data itself.
232        bytes: &'a [u8],
233    }
234
235    impl<'a, B: Ord> Encoded<'a, B> {
236        /// Returns either a decoding iterator, or just the bytes themselves.
237        pub fn decode(&'a self) -> impl Iterator<Item=&'a B> + 'a {
238            self.huffman.decode(self.bytes.iter().cloned())
239        }
240        pub fn new(huffman: &'a Huffman<B>, bytes: &'a [u8]) -> Self {
241            Self { huffman, bytes }
242        }
243    }
244
245    impl<'a, B: Ord> Copy for Encoded<'a, B> { }
246    impl<'a, B: Ord> Clone for Encoded<'a, B> {
247        fn clone(&self) -> Self { *self }
248    }
249}
250
251mod huffman {
252
253    use std::collections::BTreeMap;
254    use std::convert::TryInto;
255
256    use self::decoder::Decoder;
257    use self::encoder::Encoder;
258
259    /// Encoding and decoding state for Huffman codes.
260    pub struct Huffman<T: Ord> {
261        /// byte indexed description of what to blat down for encoding.
262        /// An entry `(bits, code)` indicates that the low `bits` of `code` should be blatted down.
263        /// Probably every `code` fits in a `u64`, unless there are crazy frequencies?
264        encode: BTreeMap<T, (usize, u64)>,
265        /// Byte-by-byte decoder.
266        decode: [Decode<T>; 256],
267    }
268    impl<T: Ord> Huffman<T> {
269
270        /// Encodes the provided symbols as a sequence of bytes.
271        ///
272        /// The last byte may only contain partial information, but it should be recorded as presented,
273        /// as we haven't a way to distinguish (e.g. a `Result` return type).
274        pub fn encode<'a, I>(&'a self, symbols: I) -> Encoder<'a, T, I::IntoIter>
275        where
276            I: IntoIterator<Item = &'a T>,
277        {
278            Encoder::new(&self.encode, symbols.into_iter())
279        }
280
281        /// Decodes the provided bytes as a sequence of symbols.
282        pub fn decode<I>(&self, bytes: I) -> Decoder<'_, T, I::IntoIter>
283        where
284            I: IntoIterator<Item=u8>
285        {
286            Decoder::new(&self.decode, bytes.into_iter())
287        }
288
289        pub fn create_from(counts: BTreeMap<T, i64>) -> Self where T: Clone {
290
291            if counts.is_empty() {
292                return Self {
293                    encode: Default::default(),
294                    decode: Decode::map(),
295                };
296            }
297
298            let mut heap = std::collections::BinaryHeap::new();
299            for (item, count) in counts {
300                heap.push((-count, Node::Leaf(item)));
301            }
302            let mut tree = Vec::with_capacity(2 * heap.len() - 1);
303            while heap.len() > 1 {
304                let (count1, least1) = heap.pop().unwrap();
305                let (count2, least2) = heap.pop().unwrap();
306                let fork = Node::Fork(tree.len(), tree.len()+1);
307                tree.push(least1);
308                tree.push(least2);
309                heap.push((count1 + count2, fork));
310            }
311            tree.push(heap.pop().unwrap().1);
312
313            let mut levels = Vec::with_capacity(1 + tree.len()/2);
314            let mut todo = vec![(tree.last().unwrap(), 0)];
315            while let Some((node, level)) = todo.pop() {
316                match node {
317                    Node::Leaf(sym) => { levels.push((level, sym)); },
318                    Node::Fork(l,r) => {
319                        todo.push((&tree[*l], level + 1));
320                        todo.push((&tree[*r], level + 1));
321                    },
322                }
323            }
324            levels.sort_by(|x,y| x.0.cmp(&y.0));
325            let mut code: u64 = 0;
326            let mut prev_level = 0;
327            let mut encode = BTreeMap::new();
328            let mut decode = Decode::map();
329            for (level, sym) in levels {
330                if prev_level != level {
331                    code <<= level - prev_level;
332                    prev_level = level;
333                }
334                encode.insert(sym.clone(), (level, code));
335                Self::insert_decode(&mut decode, sym, level, code << (64-level));
336
337                code += 1;
338            }
339
340            for (index, entry) in decode.iter().enumerate() {
341                if entry.any_void() {
342                    panic!("VOID FOUND: {:?}", index);
343                }
344            }
345
346            Huffman {
347                encode,
348                decode,
349            }
350        }
351
352        /// Inserts a symbol, and
353        fn insert_decode(map: &mut [Decode<T>; 256], symbol: &T, bits: usize, code: u64) where T: Clone {
354            let byte: u8 = (code >> 56).try_into().unwrap();
355            if bits <= 8 {
356                for off in 0 .. (1 << (8 - bits)) {
357                    map[(byte as usize) + off] = Decode::Symbol(symbol.clone(), bits);
358                }
359            }
360            else {
361                if let Decode::Void = &map[byte as usize] {
362                    map[byte as usize] = Decode::Further(Box::new(Decode::map()));
363                }
364                if let Decode::Further(next_map) = &mut map[byte as usize] {
365                    Self::insert_decode(next_map, symbol, bits - 8, code << 8);
366                }
367            }
368        }
369    }
370    /// Tree structure for Huffman bit length determination.
371    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug)]
372    enum Node<T> {
373        Leaf(T),
374        Fork(usize, usize),
375    }
376
377    /// Decoder
378    #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
379    pub enum Decode<T> {
380        /// An as-yet unfilled slot.
381        #[default]
382        Void,
383        /// The symbol, and the number of bits consumed.
384        Symbol(T, usize),
385        /// An additional map to push subsequent bytes at.
386        Further(Box<[Decode<T>; 256]>),
387    }
388
389    impl<T> Decode<T> {
390        /// Tests to see if the map contains any invalid values.
391        ///
392        /// A correctly initialized map will have no invalid values.
393        /// A map with invalid values will be unable to decode some
394        /// input byte sequences.
395        fn any_void(&self) -> bool {
396            match self {
397                Decode::Void => true,
398                Decode::Symbol(_,_) => false,
399                Decode::Further(map) => map.iter().any(|m| m.any_void()),
400            }
401        }
402        /// Creates a new map containing invalid values.
403        fn map() -> [Decode<T>; 256] {
404            let mut vec = Vec::with_capacity(256);
405            for _ in 0 .. 256 {
406                vec.push(Decode::Void);
407            }
408            vec.try_into().ok().unwrap()
409        }
410    }
411
412
413    /// A tabled Huffman decoder, written as an iterator.
414    mod decoder {
415
416        use super::Decode;
417
418        #[derive(Copy, Clone)]
419        pub struct Decoder<'a, T, I> {
420            decode: &'a [Decode<T>; 256],
421            bytes: I,
422            pending_byte: u16,
423            pending_bits: usize,
424        }
425
426        impl<'a, T, I> Decoder<'a, T, I> {
427            pub fn new(decode: &'a [Decode<T>; 256], bytes: I) -> Self {
428                Self {
429                    decode,
430                    bytes,
431                    pending_byte: 0,
432                    pending_bits: 0,
433                }
434            }
435        }
436
437        impl<'a, T, I: Iterator<Item=u8>> Iterator for Decoder<'a, T, I> {
438            type Item = &'a T;
439            fn next(&mut self) -> Option<&'a T> {
440                // We must navigate `self.decode`, restocking bits whenever possible.
441                // We stop if ever there are not enough bits remaining.
442                let mut map = self.decode;
443                loop {
444                    if self.pending_bits < 8 {
445                        if let Some(next_byte) = self.bytes.next() {
446                            self.pending_byte = (self.pending_byte << 8) + next_byte as u16;
447                            self.pending_bits += 8;
448                        }
449                        else {
450                            return None;
451                        }
452                    }
453                    let byte = (self.pending_byte >> (self.pending_bits - 8)) as usize;
454                    match &map[byte] {
455                        Decode::Void => { panic!("invalid decoding map"); }
456                        Decode::Symbol(s, bits) => {
457                            self.pending_bits -= bits;
458                            self.pending_byte &= (1 << self.pending_bits) - 1;
459                            return Some(s);
460                        }
461                        Decode::Further(next_map) => {
462                            self.pending_bits -= 8;
463                            self.pending_byte &= (1 << self.pending_bits) - 1;
464                            map = next_map;
465                        }
466                    }
467                }
468            }
469        }
470    }
471
472    /// A tabled Huffman encoder, written as an iterator.
473    mod encoder {
474
475        use std::collections::BTreeMap;
476
477        #[derive(Copy, Clone)]
478        pub struct Encoder<'a, T, I> {
479            encode: &'a BTreeMap<T, (usize, u64)>,
480            symbols: I,
481            pending_byte: u64,
482            pending_bits: usize,
483        }
484
485        impl<'a, T, I> Encoder<'a, T, I> {
486            pub fn new(encode: &'a BTreeMap<T, (usize, u64)>, symbols: I) -> Self {
487                Self {
488                    encode,
489                    symbols,
490                    pending_byte: 0,
491                    pending_bits: 0,
492                }
493            }
494        }
495
496        impl<'a, T: Ord, I> Iterator for Encoder<'a, T, I>
497        where
498            I: Iterator<Item=&'a T>,
499        {
500            type Item = u8;
501            fn next(&mut self) -> Option<u8> {
502                // We repeatedly ship bytes out of `self.pending_byte`, restocking from `self.symbols`.
503                while self.pending_bits < 8 {
504                    if let Some(symbol) = self.symbols.next() {
505                        let (bits, code) = self.encode.get(symbol).unwrap();
506                        self.pending_byte <<= bits;
507                        self.pending_byte += code;
508                        self.pending_bits += bits;
509                    }
510                    else {
511                        // We have run out of symbols. Perhaps there is a final fractional byte to ship?
512                        if self.pending_bits > 0 {
513                            let byte = self.pending_byte << (8 - self.pending_bits);
514                            self.pending_bits = 0;
515                            self.pending_byte = 0;
516                            return Some(byte as u8);
517                        }
518                        else {
519                            return None;
520                        }
521                    }
522                }
523
524                let byte = self.pending_byte >> (self.pending_bits - 8);
525                self.pending_bits -= 8;
526                self.pending_byte &= (1 << self.pending_bits) - 1;
527                Some(byte as u8)
528            }
529        }
530    }
531
532}