1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod consolidate;
23
24use std::hash::Hash;
25
26use columnar::Borrow;
27use columnar::bytes::indexed;
28use columnar::common::IterOwn;
29use columnar::{Columnar, Ref};
30use columnar::{FromBytes, Index, Len};
31use differential_dataflow::Hashable;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use timely::Accountable;
34use timely::bytes::arc::Bytes;
35use timely::container::{DrainContainer, PushInto};
36use timely::dataflow::channels::ContainerBytes;
37
38use crate::columnation::{ColInternalMerger, ColumnationStack};
39
40pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42 Column<((K, V), T, R)>,
43 batcher::Chunker<ColumnationStack<((K, V), T, R)>>,
44 ColInternalMerger<(K, V), T, R>,
45>;
46pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49pub enum Column<C: Columnar> {
55 Typed(C::Container),
57 Bytes(Bytes),
59 Align(Vec<u64>),
66}
67
68impl<C: Columnar> Column<C> {
69 #[inline]
71 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
72 match self {
73 Column::Typed(t) => t.borrow(),
74 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
75 &mut indexed::decode(bytemuck::cast_slice(b)),
76 ),
77 Column::Align(a) => {
78 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
79 }
80 }
81 }
82}
83
84impl<C: Columnar> Default for Column<C> {
85 fn default() -> Self {
86 Self::Typed(Default::default())
87 }
88}
89
90impl<C: Columnar> Clone for Column<C>
91where
92 C::Container: Clone,
93{
94 fn clone(&self) -> Self {
95 match self {
96 Column::Typed(t) => Column::Typed(t.clone()),
99 Column::Bytes(b) => {
100 assert_eq!(b.len() % 8, 0);
101 Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
102 }
103 Column::Align(a) => Column::Align(a.clone()),
104 }
105 }
106}
107
108impl<C: Columnar> Accountable for Column<C> {
109 #[inline]
110 fn record_count(&self) -> i64 {
111 self.borrow().len().try_into().expect("Must fit")
112 }
113}
114impl<C: Columnar> DrainContainer for Column<C> {
115 type Item<'a> = Ref<'a, C>;
116 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
117 #[inline]
118 fn drain(&mut self) -> Self::DrainIter<'_> {
119 self.borrow().into_index_iter()
120 }
121}
122
123impl<C: Columnar, T> PushInto<T> for Column<C>
124where
125 C::Container: columnar::Push<T>,
126{
127 #[inline]
128 fn push_into(&mut self, item: T) {
129 use columnar::Push;
130 match self {
131 Column::Typed(t) => t.push(item),
132 Column::Align(_) | Column::Bytes(_) => {
133 unimplemented!("Pushing into Column::Bytes without first clearing");
136 }
137 }
138 }
139}
140
141impl<C: Columnar> ContainerBytes for Column<C> {
142 #[inline]
143 fn from_bytes(bytes: Bytes) -> Self {
144 assert_eq!(bytes.len() % 8, 0);
150 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
151 Self::Bytes(bytes)
152 } else {
153 Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
156 }
157 }
158
159 #[inline]
160 fn length_in_bytes(&self) -> usize {
161 match self {
162 Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
163 Column::Bytes(b) => b.len(),
164 Column::Align(a) => 8 * a.len(),
165 }
166 }
167
168 #[inline]
169 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
170 match self {
171 Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
172 Column::Bytes(b) => writer.write_all(b).unwrap(),
173 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
174 }
175 }
176}
177
178#[inline(always)]
182pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
183where
184 K: Columnar,
185 for<'a> Ref<'a, K>: Hash,
186 V: Columnar,
187 D: Columnar,
188 T: Columnar,
189{
190 k.hashed()
191}
192
193#[cfg(test)]
194mod tests {
195 use timely::bytes::arc::BytesMut;
196 use timely::container::PushInto;
197 use timely::dataflow::channels::ContainerBytes;
198
199 use super::*;
200
201 fn raw_columnar_bytes() -> Vec<u8> {
203 let mut raw = Vec::new();
204 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
207 raw.extend(2_i32.to_le_bytes());
208 raw.extend(3_i32.to_le_bytes());
209 raw.extend([0, 0, 0, 0]); raw
211 }
212
213 #[mz_ore::test]
214 fn test_column_clone() {
215 let columns = Columnar::as_columns([1, 2, 3].iter());
216 let column_typed: Column<i32> = Column::Typed(columns);
217 let column_typed2 = column_typed.clone();
218
219 assert_eq!(
220 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
221 vec![&1, &2, &3]
222 );
223
224 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
225 let column_bytes: Column<i32> = Column::Bytes(bytes);
226 let column_bytes2 = column_bytes.clone();
227
228 assert_eq!(
229 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
230 vec![&1, &2, &3]
231 );
232
233 let raw = raw_columnar_bytes();
234 let mut region: Vec<u64> = vec![0; raw.len() / 8];
235 let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
236 region_bytes[..raw.len()].copy_from_slice(&raw);
237 let column_align: Column<i32> = Column::Align(region);
238 let column_align2 = column_align.clone();
239
240 assert_eq!(
241 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
242 vec![&1, &2, &3]
243 );
244 }
245
246 #[mz_ore::test]
249 fn test_column_known_bytes() {
250 let mut column: Column<i32> = Default::default();
251 column.push_into(1);
252 column.push_into(2);
253 column.push_into(3);
254 let mut data = Vec::new();
255 column.into_bytes(&mut std::io::Cursor::new(&mut data));
256 assert_eq!(data, raw_columnar_bytes());
257 }
258
259 #[mz_ore::test]
260 fn test_column_from_bytes() {
261 let raw = raw_columnar_bytes();
262
263 let buf = vec![0; raw.len() + 8];
264 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
265 let mut bytes_mut = BytesMut::from(buf);
266 let _ = bytes_mut.extract_to(align);
267 bytes_mut[..raw.len()].copy_from_slice(&raw);
268 let aligned_bytes = bytes_mut.extract_to(raw.len());
269
270 let column: Column<i32> = Column::from_bytes(aligned_bytes);
271 assert!(matches!(column, Column::Bytes(_)));
272 assert_eq!(
273 column.borrow().into_index_iter().collect::<Vec<_>>(),
274 vec![&1, &2, &3]
275 );
276
277 let buf = vec![0; raw.len() + 8];
278 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
279 let mut bytes_mut = BytesMut::from(buf);
280 let _ = bytes_mut.extract_to(align + 1);
281 bytes_mut[..raw.len()].copy_from_slice(&raw);
282 let unaligned_bytes = bytes_mut.extract_to(raw.len());
283
284 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
285 assert!(matches!(column, Column::Align(_)));
286 assert_eq!(
287 column.borrow().into_index_iter().collect::<Vec<_>>(),
288 vec![&1, &2, &3]
289 );
290 }
291}