Skip to main content

columnar/
bytes.rs

1//! Logic related to the transformation to and from bytes.
2//!
3//! The methods here line up with the `AsBytes` and `FromBytes` traits.
4
5use crate::AsBytes;
6
7/// A coupled encode/decode pair for byte sequences.
8pub trait EncodeDecode {
9    /// Encoded length in number of `u64` words required.
10    fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a>;
11    /// Encoded length in number of `u8` bytes required.
12    ///
13    /// This method should always be eight times `Self::length_in_words`, and is provided for convenience and clarity.
14    fn length_in_bytes<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> { 8 * Self::length_in_words(bytes) }
15    /// Encodes `bytes` into a sequence of `u64`.
16    fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a>;
17    /// Writes `bytes` in the encoded format to an arbitrary writer.
18    fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a>;
19    /// Decodes bytes from a sequence of `u64`.
20    fn decode(store: &[u64]) -> impl Iterator<Item=&[u8]>;
21}
22
23/// A sequential byte layout for `AsBytes` and `FromBytes` implementors.
24///
25/// The layout is aligned like a sequence of `u64`, where we repeatedly announce a length,
26/// and then follow it by that many bytes. We may need to follow this with padding bytes.
27pub use serialization::Sequence;
28mod serialization {
29
30    use crate::AsBytes;
31
32    /// Encodes and decodes bytes sequences, by prepending the length and appending the all sequences.
33    pub struct Sequence;
34    impl super::EncodeDecode for Sequence {
35        fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> {
36            // Each byte slice has one `u64` for the length, and then as many `u64`s as needed to hold all bytes.
37            bytes.as_bytes().map(|(_align, bytes)| 1 + bytes.len().div_ceil(8)).sum()
38        }
39        fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a> {
40            encode(store, bytes.as_bytes())
41        }
42        fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a> {
43            write(writer, bytes.as_bytes())
44        }
45        fn decode(store: &[u64]) -> impl Iterator<Item=&[u8]> {
46            decode(store)
47        }
48    }
49
50    /// Encodes a sequence of byte slices as their length followed by their bytes, aligned to 8 bytes.
51    ///
52    /// Each length will be exactly 8 bytes, and the bytes that follow are padded out to a multiple of 8 bytes.
53    /// When reading the data, the length is in bytes, and one should consume those bytes and advance over padding.
54    pub fn encode<'a>(store: &mut Vec<u64>, bytes: impl Iterator<Item=(u64, &'a [u8])>) {
55        for (align, bytes) in bytes {
56            assert!(align <= 8);
57            store.push(bytes.len() as u64);
58            let whole_words = 8 * (bytes.len() / 8);
59            // We want to extend `store` by `bytes`, but `bytes` may not be `u64` aligned.
60            // In the latter case, init `store` and cast and copy onto it as a byte slice.
61            if let Ok(words) = bytemuck::try_cast_slice(&bytes[.. whole_words]) {
62                store.extend_from_slice(words);
63            }
64            else {
65                let store_len = store.len();
66                store.resize(store_len + whole_words/8, 0);
67                let slice = bytemuck::try_cast_slice_mut(&mut store[store_len..]).expect("&[u64] should convert to &[u8]");
68                slice.copy_from_slice(&bytes[.. whole_words]);
69            }
70            let remaining_bytes = &bytes[whole_words..];
71            if !remaining_bytes.is_empty() {
72                let mut remainder = 0u64;
73                let transmute: &mut [u8] = bytemuck::try_cast_slice_mut(std::slice::from_mut(&mut remainder)).expect("&[u64] should convert to &[u8]");
74                for (i, byte) in remaining_bytes.iter().enumerate() {
75                    transmute[i] = *byte;
76                }
77                store.push(remainder);
78            }
79        }
80    }
81
82    /// Writes a sequence of byte slices as their length followed by their bytes, padded to 8 bytes.
83    ///
84    /// Each length will be exactly 8 bytes, and the bytes that follow are padded out to a multiple of 8 bytes.
85    /// When reading the data, the length is in bytes, and one should consume those bytes and advance over padding.
86    pub fn write<'a>(mut writer: impl std::io::Write, bytes: impl Iterator<Item=(u64, &'a [u8])>) -> std::io::Result<()> {
87        // Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice
88        // serialize as first its length in bytes, and then as many `u64` values as needed.
89        // Padding should be added, but only for alignment; no specific values are required.
90        for (align, bytes) in bytes {
91            assert!(align <= 8);
92            let length = u64::try_from(bytes.len()).unwrap();
93            writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length)))?;
94            writer.write_all(bytes)?;
95            let padding = usize::try_from((8 - (length % 8)) % 8).unwrap();
96            writer.write_all(&[0; 8][..padding])?;
97        }
98        Ok(())
99    }
100
101    /// Decodes a sequence of byte slices from their length followed by their bytes.
102    ///
103    /// This decoder matches the `encode` function above.
104    /// In particular, it anticipates padding bytes when the length is not a multiple of eight.
105    pub fn decode(store: &[u64]) -> Decoder<'_> {
106        Decoder { store }
107    }
108
109    /// An iterator over byte slices, decoding from a sequence of lengths followed by bytes.
110    pub struct Decoder<'a> {
111        store: &'a [u64],
112    }
113
114    impl<'a> Iterator for Decoder<'a> {
115        type Item = &'a [u8];
116        fn next(&mut self) -> Option<Self::Item> {
117            if let Some(length) = self.store.first() {
118                let length = *length as usize;
119                self.store = &self.store[1..];
120                let whole_words = if length % 8 == 0 { length / 8 } else { length / 8 + 1 };
121                let bytes: &[u8] = bytemuck::try_cast_slice(&self.store[..whole_words]).expect("&[u64] should convert to &[u8]");
122                self.store = &self.store[whole_words..];
123                Some(&bytes[..length])
124            } else {
125                None
126            }
127        }
128    }
129}
130
131/// A binary encoding of sequences of byte slices.
132///
133/// The encoding starts with a sequence of n+1 offsets describing where to find the n slices in the bytes that follow.
134/// Treating the offsets as a byte slice too, the each offset indicates the location (in bytes) of the end of its slice.
135/// Each byte slice can be found from a pair of adjacent offsets, where the first is rounded up to a multiple of eight.
136pub use serialization_neu::Indexed;
137pub mod serialization_neu {
138
139    use crate::AsBytes;
140
141    /// Encodes and decodes bytes sequences, using an index of offsets.
142    pub struct Indexed;
143    impl super::EncodeDecode for Indexed {
144        fn length_in_words<'a, A>(bytes: &A) -> usize where A : AsBytes<'a> {
145            1 + bytes.as_bytes().map(|(_align, bytes)| 1 + bytes.len().div_ceil(8)).sum::<usize>()
146        }
147        fn encode<'a, A>(store: &mut Vec<u64>, bytes: &A) where A : AsBytes<'a> {
148            encode(store, bytes)
149        }
150        fn write<'a, A, W: std::io::Write>(writer: W, bytes: &A) -> std::io::Result<()> where A : AsBytes<'a> {
151            write(writer, bytes)
152        }
153        fn decode(store: &[u64]) -> impl Iterator<Item=&[u8]> {
154            decode(store)
155        }
156    }
157
158    /// Encodes `item` into `u64` aligned words.
159    ///
160    /// The sequence of byte slices are appended, with padding to have each slice start `u64` aligned.
161    /// The sequence is then pre-pended with as many byte offsets as there are slices in `item`, plus one.
162    /// The byte offsets indicate where each slice ends, and by rounding up to `u64` alignemnt where the next slice begins.
163    /// The first offset indicates where the list of offsets itself ends, and where the first slice begins.
164    ///
165    /// We will need to visit `as_bytes` three times to extract this information, so the method should be efficient and inlined.
166    /// The first read writes the first offset, the second writes each other offset, and the third writes the bytes themselves.
167    ///
168    /// The offsets are zero-based, rather than based on `store.len()`.
169    /// If you call the method with a non-empty `store` be careful decoding.
170    pub fn encode<'a, A>(store: &mut Vec<u64>, iter: &A)
171    where A : AsBytes<'a>,
172    {
173        // Read 1: Number of offsets we will record, equal to the number of slices plus one.
174        // TODO: right-size `store` before first call to `push`.
175        let offsets = 1 + iter.as_bytes().count();
176        let offsets_end: u64 = TryInto::<u64>::try_into((offsets) * std::mem::size_of::<u64>()).unwrap();
177        store.push(offsets_end);
178        // Read 2: Establish each of the offsets based on lengths of byte slices.
179        let mut position_bytes = offsets_end;
180        for (align, bytes) in iter.as_bytes() {
181            assert!(align <= 8);
182            // Write length in bytes, but round up to words before updating `position_bytes`.
183            let to_push: u64 = position_bytes + TryInto::<u64>::try_into(bytes.len()).unwrap();
184            store.push(to_push);
185            let round_len: u64 = ((bytes.len() + 7) & !7).try_into().unwrap();
186            position_bytes += round_len;
187        }
188        // Read 3: Append each byte slice, with padding to align starts to `u64`.
189        for (_align, bytes) in iter.as_bytes() {
190            let whole_words = 8 * (bytes.len() / 8);
191            // We want to extend `store` by `bytes`, but `bytes` may not be `u64` aligned.
192            // In the latter case, init `store` and cast and copy onto it as a byte slice.
193            if let Ok(words) = bytemuck::try_cast_slice(&bytes[.. whole_words]) {
194                store.extend_from_slice(words);
195            }
196            else {
197                let store_len = store.len();
198                store.resize(store_len + whole_words/8, 0);
199                let slice = bytemuck::try_cast_slice_mut(&mut store[store_len..]).expect("&[u64] should convert to &[u8]");
200                slice.copy_from_slice(&bytes[.. whole_words]);
201            }
202            let remaining_bytes = &bytes[whole_words..];
203            if !remaining_bytes.is_empty() {
204                let mut remainder = 0u64;
205                let transmute: &mut [u8] = bytemuck::try_cast_slice_mut(std::slice::from_mut(&mut remainder)).expect("&[u64] should convert to &[u8]");
206                for (i, byte) in remaining_bytes.iter().enumerate() {
207                    transmute[i] = *byte;
208                }
209                store.push(remainder);
210            }
211        }
212    }
213
214    pub fn write<'a, A, W>(mut writer: W, iter: &A) -> std::io::Result<()>
215    where
216        A: AsBytes<'a>,
217        W: std::io::Write,
218    {
219        // Read 1: Number of offsets we will record, equal to the number of slices plus one.
220        let offsets = 1 + iter.as_bytes().count();
221        let offsets_end: u64 = TryInto::<u64>::try_into((offsets) * std::mem::size_of::<u64>()).unwrap();
222        writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&offsets_end)))?;
223        // Read 2: Establish each of the offsets based on lengths of byte slices.
224        let mut position_bytes = offsets_end;
225        for (align, bytes) in iter.as_bytes() {
226            assert!(align <= 8);
227            // Write length in bytes, but round up to words before updating `position_bytes`.
228            let to_push: u64 = position_bytes + TryInto::<u64>::try_into(bytes.len()).unwrap();
229            writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&to_push)))?;
230            let round_len: u64 = ((bytes.len() + 7) & !7).try_into().unwrap();
231            position_bytes += round_len;
232        }
233        // Read 3: Append each byte slice, with padding to align starts to `u64`.
234        for (_align, bytes) in iter.as_bytes() {
235            writer.write_all(bytes)?;
236            let padding = ((bytes.len() + 7) & !7) - bytes.len();
237            if padding > 0 {
238                writer.write_all(&[0u8;8][..padding])?;
239            }
240        }
241
242        Ok(())
243    }
244
245    /// Decodes an encoded sequence of byte slices. Each result will be `u64` aligned.
246    pub fn decode(store: &[u64]) -> impl Iterator<Item=&[u8]> {
247        assert!(store[0] % 8 == 0);
248        let slices = (store[0] / 8) - 1;
249        (0 .. slices).map(|i| decode_index(store, i))
250    }
251
252    /// Decodes a specific byte slice by index. It will be `u64` aligned.
253    #[inline(always)]
254    pub fn decode_index(store: &[u64], index: u64) -> &[u8] {
255        debug_assert!(index + 1 < store[0]/8);
256        let index: usize = index.try_into().unwrap();
257        let lower: usize = ((store[index] + 7) & !7).try_into().unwrap();
258        let upper: usize = (store[index + 1]).try_into().unwrap();
259        let bytes: &[u8] = bytemuck::try_cast_slice(store).expect("&[u64] should convert to &[u8]");
260        &bytes[lower .. upper]
261    }
262
263    #[cfg(test)]
264    mod test {
265
266        use crate::{Borrow, ContainerOf};
267        use crate::common::Push;
268        use crate::AsBytes;
269
270        use super::{encode, decode};
271
272        fn assert_roundtrip<'a, AB: AsBytes<'a>>(item: &AB) {
273            let mut store = Vec::new();
274            encode(&mut store, item);
275            assert!(item.as_bytes().map(|x| x.1).eq(decode(&store)));
276        }
277
278        #[test]
279        fn round_trip() {
280
281            let mut column: ContainerOf<Result<u64, String>> = Default::default();
282            for i in 0..10000u64 {
283                column.push(&Ok::<u64, String>(i));
284                column.push(&Err::<u64, String>(format!("{:?}", i)));
285            }
286
287            assert_roundtrip(&column.borrow());
288        }
289    }
290}
291
292/// A container of either typed columns, or serialized bytes that can be borrowed as the former.
293pub mod stash {
294
295    use crate::{Len, FromBytes};
296    use crate::bytes::{EncodeDecode, Indexed};
297
298    /// A container of either typed columns, or serialized bytes that can be borrowed as the former.
299    ///
300    /// When `B` dereferences to a byte slice, the container can be borrowed as if the container type `C`.
301    /// This container inherents the readable properties of `C` through borrowing, but does not implement
302    /// the traits itself.
303    ///
304    /// The container can be cleared and pushed into. When cleared it reverts to a typed variant, and when
305    /// pushed into if the typed variant it will accept the item, and if not it will panic.
306    #[derive(Clone)]
307    pub enum Stash<C, B> {
308        /// The typed variant of the container.
309        Typed(C),
310        /// The bytes variant of the container.
311        Bytes(B),
312        /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
313        ///
314        /// Most commonly this works around misaligned binary data, but it can also be useful if the `B`
315        /// type is a scarce resource that should be released.
316        Align(Box<[u64]>),
317    }
318
319    impl<C: Default, B> Default for Stash<C, B> { fn default() -> Self { Self::Typed(Default::default()) } }
320
321    impl<C: crate::ContainerBytes, B: std::ops::Deref<Target=[u8]> + Clone + 'static> crate::Borrow for Stash<C, B> {
322
323        type Ref<'a> = <C as crate::Borrow>::Ref<'a>;
324        type Borrowed<'a> = <C as crate::Borrow>::Borrowed<'a>;
325
326        #[inline(always)] fn borrow<'a>(&'a self) -> Self::Borrowed<'a> { self.borrow() }
327        #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b> where Self: 'a { <C as crate::Borrow>::reborrow(item) }
328        #[inline(always)] fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b> where Self: 'a { <C as crate::Borrow>::reborrow_ref(item) }
329    }
330
331    impl<C: crate::ContainerBytes, B: std::ops::Deref<Target=[u8]>> Len for Stash<C, B> {
332        #[inline(always)] fn len(&self) -> usize { self.borrow().len() }
333    }
334
335    impl<C: crate::ContainerBytes, B: std::ops::Deref<Target=[u8]>> Stash<C, B> {
336        /// Borrows the contents, either from a typed container or by decoding serialized bytes.
337        ///
338        /// This method is relatively cheap but is not free.
339        #[inline(always)] pub fn borrow<'a>(&'a self) -> <C as crate::Borrow>::Borrowed<'a> {
340            match self {
341                Stash::Typed(t) => t.borrow(),
342                Stash::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
343                Stash::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
344            }
345        }
346        /// The number of bytes needed to write the contents using the `Indexed` encoder.
347        pub fn length_in_bytes(&self) -> usize {
348            match self {
349                // We'll need one u64 for the length, then the length rounded up to a multiple of 8.
350                Stash::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()),
351                Stash::Bytes(b) => b.len(),
352                Stash::Align(a) => 8 * a.len(),
353            }
354        }
355        /// Write the contents into a `std::io::Write` using the `Indexed` encoder.
356        pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
357            match self {
358                Stash::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() },
359                Stash::Bytes(b) => writer.write_all(&b[..]).unwrap(),
360                Stash::Align(a) => writer.write_all(bytemuck::cast_slice(&a[..])).unwrap(),
361            }
362        }
363    }
364
365    impl<T, C: crate::Container + crate::Push<T>, B> crate::Push<T> for Stash<C, B> {
366        fn push(&mut self, item: T) {
367            match self {
368                Stash::Typed(t) => t.push(item),
369                Stash::Bytes(_) | Stash::Align(_) => unimplemented!(),
370            }
371        }
372    }
373
374    impl<C: crate::Clear + Default, B> crate::Clear for Stash<C, B> {
375        fn clear(&mut self) {
376            match self {
377                Stash::Typed(t) => t.clear(),
378                Stash::Bytes(_) | Stash::Align(_) => {
379                    *self = Stash::Typed(Default::default());
380                }
381            }
382        }
383    }
384
385    impl<C: crate::Container, B: std::ops::Deref<Target = [u8]>> From<B> for Stash<C, B> {
386        fn from(bytes: B) -> Self {
387            assert!(bytes.len() % 8 == 0);
388            if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() {
389                Self::Bytes(bytes)
390            }
391            else {
392                // Re-locating bytes for alignment reasons.
393                let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
394                bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
395                Self::Align(alloc.into())
396            }
397        }
398    }
399}
400
401#[cfg(test)]
402mod test {
403    use crate::ContainerOf;
404
405    #[test]
406    fn round_trip() {
407
408        use crate::common::{Push, HeapSize, Len, Index};
409        use crate::{Borrow, AsBytes, FromBytes};
410
411        let mut column: ContainerOf<Result<u64, u64>> = Default::default();
412        for i in 0..100u64 {
413            column.push(Ok::<u64, u64>(i));
414            column.push(Err::<u64, u64>(i));
415        }
416
417        assert_eq!(column.len(), 200);
418        assert_eq!(column.heap_size(), (1624, 2080));
419
420        for i in 0..100 {
421            assert_eq!(column.get(2*i+0), Ok(i as u64));
422            assert_eq!(column.get(2*i+1), Err(i as u64));
423        }
424
425        let column2 = crate::Results::<&[u64], &[u64], &[u64], &[u64], &u64>::from_bytes(&mut column.borrow().as_bytes().map(|(_, bytes)| bytes));
426        for i in 0..100 {
427            assert_eq!(column.get(2*i+0), column2.get(2*i+0).copied().map_err(|e| *e));
428            assert_eq!(column.get(2*i+1), column2.get(2*i+1).copied().map_err(|e| *e));
429        }
430
431        let column3 = crate::Results::<&[u64], &[u64], &[u64], &[u64], &u64>::from_bytes(&mut column2.as_bytes().map(|(_, bytes)| bytes));
432        for i in 0..100 {
433            assert_eq!(column3.get(2*i+0), column2.get(2*i+0));
434            assert_eq!(column3.get(2*i+1), column2.get(2*i+1));
435        }
436    }
437}