1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Container as _;
26use columnar::bytes::{EncodeDecode, Indexed};
27use columnar::common::IterOwn;
28use columnar::{Clear, FromBytes, Index, Len};
29use columnar::{Columnar, Ref};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
33use mz_ore::region::Region;
34use timely::Container;
35use timely::bytes::arc::Bytes;
36use timely::container::PushInto;
37use timely::dataflow::channels::ContainerBytes;
38
39pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
41 Column<((K, V), T, R)>,
42 batcher::Chunker<TimelyStack<((K, V), T, R)>>,
43 ColMerger<(K, V), T, R>,
44>;
45pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
47
48pub enum Column<C: Columnar> {
54 Typed(C::Container),
56 Bytes(Bytes),
58 Align(Region<u64>),
63}
64
65impl<C: Columnar> Column<C> {
66 #[inline]
68 fn borrow(&self) -> <C::Container as columnar::Container>::Borrowed<'_> {
69 match self {
70 Column::Typed(t) => t.borrow(),
71 Column::Bytes(b) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
72 &mut Indexed::decode(bytemuck::cast_slice(b)),
73 ),
74 Column::Align(a) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
75 &mut Indexed::decode(a),
76 ),
77 }
78 }
79}
80
81impl<C: Columnar> Default for Column<C> {
82 fn default() -> Self {
83 Self::Typed(Default::default())
84 }
85}
86
87impl<C: Columnar> Clone for Column<C>
88where
89 C::Container: Clone,
90{
91 fn clone(&self) -> Self {
92 match self {
93 Column::Typed(t) => Column::Typed(t.clone()),
96 Column::Bytes(b) => {
97 assert_eq!(b.len() % 8, 0);
98 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
99 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
100 alloc_bytes[..b.len()].copy_from_slice(b);
101 Self::Align(alloc)
102 }
103 Column::Align(a) => {
104 let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
105 alloc[..a.len()].copy_from_slice(a);
106 Column::Align(alloc)
107 }
108 }
109 }
110}
111
112impl<C: Columnar> Container for Column<C> {
113 type ItemRef<'a> = columnar::Ref<'a, C>;
114 type Item<'a> = columnar::Ref<'a, C>;
115
116 #[inline]
117 fn len(&self) -> usize {
118 self.borrow().len()
119 }
120
121 #[inline]
123 fn clear(&mut self) {
124 match self {
125 Column::Typed(t) => t.clear(),
126 Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
127 }
128 }
129
130 type Iter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
131
132 #[inline]
133 fn iter(&self) -> Self::Iter<'_> {
134 self.borrow().into_index_iter()
135 }
136
137 type DrainIter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
138
139 #[inline]
140 fn drain(&mut self) -> Self::DrainIter<'_> {
141 self.borrow().into_index_iter()
142 }
143}
144
145impl<C: Columnar, T> PushInto<T> for Column<C>
146where
147 C::Container: columnar::Push<T>,
148{
149 #[inline]
150 fn push_into(&mut self, item: T) {
151 use columnar::Push;
152 match self {
153 Column::Typed(t) => t.push(item),
154 Column::Align(_) | Column::Bytes(_) => {
155 unimplemented!("Pushing into Column::Bytes without first clearing");
158 }
159 }
160 }
161}
162
163impl<C: Columnar> ContainerBytes for Column<C> {
164 #[inline]
165 fn from_bytes(bytes: Bytes) -> Self {
166 assert_eq!(bytes.len() % 8, 0);
172 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
173 Self::Bytes(bytes)
174 } else {
175 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
177 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
178 alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
179 Self::Align(alloc)
180 }
181 }
182
183 #[inline]
184 fn length_in_bytes(&self) -> usize {
185 match self {
186 Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
187 Column::Bytes(b) => b.len(),
188 Column::Align(a) => 8 * a.len(),
189 }
190 }
191
192 #[inline]
193 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
194 match self {
195 Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
196 Column::Bytes(b) => writer.write_all(b).unwrap(),
197 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
198 }
199 }
200}
201
202#[inline(always)]
206pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
207where
208 K: Columnar,
209 for<'a> Ref<'a, K>: Hash,
210 V: Columnar,
211 D: Columnar,
212 T: Columnar,
213{
214 k.hashed()
215}
216
217#[cfg(test)]
218mod tests {
219 use mz_ore::region::Region;
220 use timely::Container;
221 use timely::bytes::arc::BytesMut;
222 use timely::container::PushInto;
223 use timely::dataflow::channels::ContainerBytes;
224
225 use super::*;
226
227 fn raw_columnar_bytes() -> Vec<u8> {
229 let mut raw = Vec::new();
230 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
233 raw.extend(2_i32.to_le_bytes());
234 raw.extend(3_i32.to_le_bytes());
235 raw.extend([0, 0, 0, 0]); raw
237 }
238
239 #[mz_ore::test]
240 fn test_column_clone() {
241 let columns = Columnar::as_columns([1, 2, 3].iter());
242 let column_typed: Column<i32> = Column::Typed(columns);
243 let column_typed2 = column_typed.clone();
244
245 assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
246
247 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
248 let column_bytes: Column<i32> = Column::Bytes(bytes);
249 let column_bytes2 = column_bytes.clone();
250
251 assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
252
253 let raw = raw_columnar_bytes();
254 let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
255 let region_bytes = bytemuck::cast_slice_mut(&mut region);
256 region_bytes[..raw.len()].copy_from_slice(&raw);
257 let column_align: Column<i32> = Column::Align(region);
258 let column_align2 = column_align.clone();
259
260 assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
261 }
262
263 #[mz_ore::test]
266 fn test_column_known_bytes() {
267 let mut column: Column<i32> = Default::default();
268 column.push_into(1);
269 column.push_into(2);
270 column.push_into(3);
271 let mut data = Vec::new();
272 column.into_bytes(&mut std::io::Cursor::new(&mut data));
273 assert_eq!(data, raw_columnar_bytes());
274 }
275
276 #[mz_ore::test]
277 fn test_column_from_bytes() {
278 let raw = raw_columnar_bytes();
279
280 let buf = vec![0; raw.len() + 8];
281 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
282 let mut bytes_mut = BytesMut::from(buf);
283 let _ = bytes_mut.extract_to(align);
284 bytes_mut[..raw.len()].copy_from_slice(&raw);
285 let aligned_bytes = bytes_mut.extract_to(raw.len());
286
287 let column: Column<i32> = Column::from_bytes(aligned_bytes);
288 assert!(matches!(column, Column::Bytes(_)));
289 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
290
291 let buf = vec![0; raw.len() + 8];
292 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
293 let mut bytes_mut = BytesMut::from(buf);
294 let _ = bytes_mut.extract_to(align + 1);
295 bytes_mut[..raw.len()].copy_from_slice(&raw);
296 let unaligned_bytes = bytes_mut.extract_to(raw.len());
297
298 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
299 assert!(matches!(column, Column::Align(_)));
300 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
301 }
302}