1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Borrow;
26use columnar::bytes::{EncodeDecode, Indexed};
27use columnar::common::IterOwn;
28use columnar::{Columnar, Ref};
29use columnar::{FromBytes, Index, Len};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
33use mz_ore::region::Region;
34use timely::Accountable;
35use timely::bytes::arc::Bytes;
36use timely::container::{DrainContainer, PushInto};
37use timely::dataflow::channels::ContainerBytes;
38
39pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
41 Column<((K, V), T, R)>,
42 batcher::Chunker<TimelyStack<((K, V), T, R)>>,
43 ColMerger<(K, V), T, R>,
44>;
45pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
47
48pub enum Column<C: Columnar> {
54 Typed(C::Container),
56 Bytes(Bytes),
58 Align(Region<u64>),
63}
64
65impl<C: Columnar> Column<C> {
66 #[inline]
68 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
69 match self {
70 Column::Typed(t) => t.borrow(),
71 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
72 &mut Indexed::decode(bytemuck::cast_slice(b)),
73 ),
74 Column::Align(a) => {
75 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut Indexed::decode(a))
76 }
77 }
78 }
79}
80
81impl<C: Columnar> Default for Column<C> {
82 fn default() -> Self {
83 Self::Typed(Default::default())
84 }
85}
86
87impl<C: Columnar> Clone for Column<C>
88where
89 C::Container: Clone,
90{
91 fn clone(&self) -> Self {
92 match self {
93 Column::Typed(t) => Column::Typed(t.clone()),
96 Column::Bytes(b) => {
97 assert_eq!(b.len() % 8, 0);
98 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
99 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
100 alloc_bytes[..b.len()].copy_from_slice(b);
101 Self::Align(alloc)
102 }
103 Column::Align(a) => {
104 let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
105 alloc[..a.len()].copy_from_slice(a);
106 Column::Align(alloc)
107 }
108 }
109 }
110}
111
112impl<C: Columnar> Accountable for Column<C> {
113 #[inline]
114 fn record_count(&self) -> i64 {
115 self.borrow().len().try_into().expect("Must fit")
116 }
117}
118impl<C: Columnar> DrainContainer for Column<C> {
119 type Item<'a> = Ref<'a, C>;
120 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
121 #[inline]
122 fn drain(&mut self) -> Self::DrainIter<'_> {
123 self.borrow().into_index_iter()
124 }
125}
126
127impl<C: Columnar, T> PushInto<T> for Column<C>
128where
129 C::Container: columnar::Push<T>,
130{
131 #[inline]
132 fn push_into(&mut self, item: T) {
133 use columnar::Push;
134 match self {
135 Column::Typed(t) => t.push(item),
136 Column::Align(_) | Column::Bytes(_) => {
137 unimplemented!("Pushing into Column::Bytes without first clearing");
140 }
141 }
142 }
143}
144
145impl<C: Columnar> ContainerBytes for Column<C> {
146 #[inline]
147 fn from_bytes(bytes: Bytes) -> Self {
148 assert_eq!(bytes.len() % 8, 0);
154 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
155 Self::Bytes(bytes)
156 } else {
157 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
159 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
160 alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
161 Self::Align(alloc)
162 }
163 }
164
165 #[inline]
166 fn length_in_bytes(&self) -> usize {
167 match self {
168 Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
169 Column::Bytes(b) => b.len(),
170 Column::Align(a) => 8 * a.len(),
171 }
172 }
173
174 #[inline]
175 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
176 match self {
177 Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
178 Column::Bytes(b) => writer.write_all(b).unwrap(),
179 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
180 }
181 }
182}
183
184#[inline(always)]
188pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
189where
190 K: Columnar,
191 for<'a> Ref<'a, K>: Hash,
192 V: Columnar,
193 D: Columnar,
194 T: Columnar,
195{
196 k.hashed()
197}
198
199#[cfg(test)]
200mod tests {
201 use mz_ore::region::Region;
202 use timely::bytes::arc::BytesMut;
203 use timely::container::PushInto;
204 use timely::dataflow::channels::ContainerBytes;
205
206 use super::*;
207
208 fn raw_columnar_bytes() -> Vec<u8> {
210 let mut raw = Vec::new();
211 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
214 raw.extend(2_i32.to_le_bytes());
215 raw.extend(3_i32.to_le_bytes());
216 raw.extend([0, 0, 0, 0]); raw
218 }
219
220 #[mz_ore::test]
221 fn test_column_clone() {
222 let columns = Columnar::as_columns([1, 2, 3].iter());
223 let column_typed: Column<i32> = Column::Typed(columns);
224 let column_typed2 = column_typed.clone();
225
226 assert_eq!(
227 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
228 vec![&1, &2, &3]
229 );
230
231 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
232 let column_bytes: Column<i32> = Column::Bytes(bytes);
233 let column_bytes2 = column_bytes.clone();
234
235 assert_eq!(
236 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
237 vec![&1, &2, &3]
238 );
239
240 let raw = raw_columnar_bytes();
241 let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
242 let region_bytes = bytemuck::cast_slice_mut(&mut region);
243 region_bytes[..raw.len()].copy_from_slice(&raw);
244 let column_align: Column<i32> = Column::Align(region);
245 let column_align2 = column_align.clone();
246
247 assert_eq!(
248 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
249 vec![&1, &2, &3]
250 );
251 }
252
253 #[mz_ore::test]
256 fn test_column_known_bytes() {
257 let mut column: Column<i32> = Default::default();
258 column.push_into(1);
259 column.push_into(2);
260 column.push_into(3);
261 let mut data = Vec::new();
262 column.into_bytes(&mut std::io::Cursor::new(&mut data));
263 assert_eq!(data, raw_columnar_bytes());
264 }
265
266 #[mz_ore::test]
267 fn test_column_from_bytes() {
268 let raw = raw_columnar_bytes();
269
270 let buf = vec![0; raw.len() + 8];
271 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
272 let mut bytes_mut = BytesMut::from(buf);
273 let _ = bytes_mut.extract_to(align);
274 bytes_mut[..raw.len()].copy_from_slice(&raw);
275 let aligned_bytes = bytes_mut.extract_to(raw.len());
276
277 let column: Column<i32> = Column::from_bytes(aligned_bytes);
278 assert!(matches!(column, Column::Bytes(_)));
279 assert_eq!(
280 column.borrow().into_index_iter().collect::<Vec<_>>(),
281 vec![&1, &2, &3]
282 );
283
284 let buf = vec![0; raw.len() + 8];
285 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
286 let mut bytes_mut = BytesMut::from(buf);
287 let _ = bytes_mut.extract_to(align + 1);
288 bytes_mut[..raw.len()].copy_from_slice(&raw);
289 let unaligned_bytes = bytes_mut.extract_to(raw.len());
290
291 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
292 assert!(matches!(column, Column::Align(_)));
293 assert_eq!(
294 column.borrow().into_index_iter().collect::<Vec<_>>(),
295 vec![&1, &2, &3]
296 );
297 }
298}