1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Container as _;
26use columnar::bytes::{EncodeDecode, Indexed};
27use columnar::common::IterOwn;
28use columnar::{Columnar, Ref};
29use columnar::{FromBytes, Index, Len};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
33use mz_ore::region::Region;
34use timely::Accountable;
35use timely::bytes::arc::Bytes;
36use timely::container::{DrainContainer, IterContainer, PushInto};
37use timely::dataflow::channels::ContainerBytes;
38
39pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
41 Column<((K, V), T, R)>,
42 batcher::Chunker<TimelyStack<((K, V), T, R)>>,
43 ColMerger<(K, V), T, R>,
44>;
45pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
47
48pub enum Column<C: Columnar> {
54 Typed(C::Container),
56 Bytes(Bytes),
58 Align(Region<u64>),
63}
64
65impl<C: Columnar> Column<C> {
66 #[inline]
68 pub fn borrow(&self) -> <C::Container as columnar::Container>::Borrowed<'_> {
69 match self {
70 Column::Typed(t) => t.borrow(),
71 Column::Bytes(b) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
72 &mut Indexed::decode(bytemuck::cast_slice(b)),
73 ),
74 Column::Align(a) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
75 &mut Indexed::decode(a),
76 ),
77 }
78 }
79}
80
81impl<C: Columnar> Default for Column<C> {
82 fn default() -> Self {
83 Self::Typed(Default::default())
84 }
85}
86
87impl<C: Columnar> Clone for Column<C>
88where
89 C::Container: Clone,
90{
91 fn clone(&self) -> Self {
92 match self {
93 Column::Typed(t) => Column::Typed(t.clone()),
96 Column::Bytes(b) => {
97 assert_eq!(b.len() % 8, 0);
98 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
99 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
100 alloc_bytes[..b.len()].copy_from_slice(b);
101 Self::Align(alloc)
102 }
103 Column::Align(a) => {
104 let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
105 alloc[..a.len()].copy_from_slice(a);
106 Column::Align(alloc)
107 }
108 }
109 }
110}
111
112impl<C: Columnar> Accountable for Column<C> {
113 #[inline]
114 fn record_count(&self) -> i64 {
115 self.borrow().len().try_into().expect("Must fit")
116 }
117}
118impl<C: Columnar> DrainContainer for Column<C> {
119 type Item<'a> = columnar::Ref<'a, C>;
120 type DrainIter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
121 #[inline]
122 fn drain(&mut self) -> Self::DrainIter<'_> {
123 self.borrow().into_index_iter()
124 }
125}
126impl<C: Columnar> IterContainer for Column<C> {
127 type ItemRef<'a> = columnar::Ref<'a, C>;
128 type Iter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
129 #[inline]
130 fn iter(&self) -> Self::Iter<'_> {
131 self.borrow().into_index_iter()
132 }
133}
134
135impl<C: Columnar, T> PushInto<T> for Column<C>
136where
137 C::Container: columnar::Push<T>,
138{
139 #[inline]
140 fn push_into(&mut self, item: T) {
141 use columnar::Push;
142 match self {
143 Column::Typed(t) => t.push(item),
144 Column::Align(_) | Column::Bytes(_) => {
145 unimplemented!("Pushing into Column::Bytes without first clearing");
148 }
149 }
150 }
151}
152
153impl<C: Columnar> ContainerBytes for Column<C> {
154 #[inline]
155 fn from_bytes(bytes: Bytes) -> Self {
156 assert_eq!(bytes.len() % 8, 0);
162 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
163 Self::Bytes(bytes)
164 } else {
165 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
167 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
168 alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
169 Self::Align(alloc)
170 }
171 }
172
173 #[inline]
174 fn length_in_bytes(&self) -> usize {
175 match self {
176 Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
177 Column::Bytes(b) => b.len(),
178 Column::Align(a) => 8 * a.len(),
179 }
180 }
181
182 #[inline]
183 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
184 match self {
185 Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
186 Column::Bytes(b) => writer.write_all(b).unwrap(),
187 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
188 }
189 }
190}
191
192#[inline(always)]
196pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
197where
198 K: Columnar,
199 for<'a> Ref<'a, K>: Hash,
200 V: Columnar,
201 D: Columnar,
202 T: Columnar,
203{
204 k.hashed()
205}
206
207#[cfg(test)]
208mod tests {
209 use mz_ore::region::Region;
210 use timely::bytes::arc::BytesMut;
211 use timely::container::PushInto;
212 use timely::dataflow::channels::ContainerBytes;
213
214 use super::*;
215
216 fn raw_columnar_bytes() -> Vec<u8> {
218 let mut raw = Vec::new();
219 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
222 raw.extend(2_i32.to_le_bytes());
223 raw.extend(3_i32.to_le_bytes());
224 raw.extend([0, 0, 0, 0]); raw
226 }
227
228 #[mz_ore::test]
229 fn test_column_clone() {
230 let columns = Columnar::as_columns([1, 2, 3].iter());
231 let column_typed: Column<i32> = Column::Typed(columns);
232 let column_typed2 = column_typed.clone();
233
234 assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
235
236 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
237 let column_bytes: Column<i32> = Column::Bytes(bytes);
238 let column_bytes2 = column_bytes.clone();
239
240 assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
241
242 let raw = raw_columnar_bytes();
243 let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
244 let region_bytes = bytemuck::cast_slice_mut(&mut region);
245 region_bytes[..raw.len()].copy_from_slice(&raw);
246 let column_align: Column<i32> = Column::Align(region);
247 let column_align2 = column_align.clone();
248
249 assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
250 }
251
252 #[mz_ore::test]
255 fn test_column_known_bytes() {
256 let mut column: Column<i32> = Default::default();
257 column.push_into(1);
258 column.push_into(2);
259 column.push_into(3);
260 let mut data = Vec::new();
261 column.into_bytes(&mut std::io::Cursor::new(&mut data));
262 assert_eq!(data, raw_columnar_bytes());
263 }
264
265 #[mz_ore::test]
266 fn test_column_from_bytes() {
267 let raw = raw_columnar_bytes();
268
269 let buf = vec![0; raw.len() + 8];
270 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
271 let mut bytes_mut = BytesMut::from(buf);
272 let _ = bytes_mut.extract_to(align);
273 bytes_mut[..raw.len()].copy_from_slice(&raw);
274 let aligned_bytes = bytes_mut.extract_to(raw.len());
275
276 let column: Column<i32> = Column::from_bytes(aligned_bytes);
277 assert!(matches!(column, Column::Bytes(_)));
278 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
279
280 let buf = vec![0; raw.len() + 8];
281 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
282 let mut bytes_mut = BytesMut::from(buf);
283 let _ = bytes_mut.extract_to(align + 1);
284 bytes_mut[..raw.len()].copy_from_slice(&raw);
285 let unaligned_bytes = bytes_mut.extract_to(raw.len());
286
287 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
288 assert!(matches!(column, Column::Align(_)));
289 assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
290 }
291}