1#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Borrow;
26use columnar::bytes::indexed;
27use columnar::common::IterOwn;
28use columnar::{Columnar, Ref};
29use columnar::{FromBytes, Index, Len};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
34use mz_ore::region::Region;
35use timely::Accountable;
36use timely::bytes::arc::Bytes;
37use timely::container::{DrainContainer, PushInto};
38use timely::dataflow::channels::ContainerBytes;
39
40pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42 Column<((K, V), T, R)>,
43 batcher::Chunker<TimelyStack<((K, V), T, R)>>,
44 ColInternalMerger<(K, V), T, R>,
45>;
46pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49pub enum Column<C: Columnar> {
55 Typed(C::Container),
57 Bytes(Bytes),
59 Align(Region<u64>),
64}
65
66impl<C: Columnar> Column<C> {
67 #[inline]
69 pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
70 match self {
71 Column::Typed(t) => t.borrow(),
72 Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
73 &mut indexed::decode(bytemuck::cast_slice(b)),
74 ),
75 Column::Align(a) => {
76 <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
77 }
78 }
79 }
80}
81
82impl<C: Columnar> Default for Column<C> {
83 fn default() -> Self {
84 Self::Typed(Default::default())
85 }
86}
87
88impl<C: Columnar> Clone for Column<C>
89where
90 C::Container: Clone,
91{
92 fn clone(&self) -> Self {
93 match self {
94 Column::Typed(t) => Column::Typed(t.clone()),
97 Column::Bytes(b) => {
98 assert_eq!(b.len() % 8, 0);
99 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
100 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
101 alloc_bytes[..b.len()].copy_from_slice(b);
102 Self::Align(alloc)
103 }
104 Column::Align(a) => {
105 let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
106 alloc[..a.len()].copy_from_slice(a);
107 Column::Align(alloc)
108 }
109 }
110 }
111}
112
113impl<C: Columnar> Accountable for Column<C> {
114 #[inline]
115 fn record_count(&self) -> i64 {
116 self.borrow().len().try_into().expect("Must fit")
117 }
118}
119impl<C: Columnar> DrainContainer for Column<C> {
120 type Item<'a> = Ref<'a, C>;
121 type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
122 #[inline]
123 fn drain(&mut self) -> Self::DrainIter<'_> {
124 self.borrow().into_index_iter()
125 }
126}
127
128impl<C: Columnar, T> PushInto<T> for Column<C>
129where
130 C::Container: columnar::Push<T>,
131{
132 #[inline]
133 fn push_into(&mut self, item: T) {
134 use columnar::Push;
135 match self {
136 Column::Typed(t) => t.push(item),
137 Column::Align(_) | Column::Bytes(_) => {
138 unimplemented!("Pushing into Column::Bytes without first clearing");
141 }
142 }
143 }
144}
145
146impl<C: Columnar> ContainerBytes for Column<C> {
147 #[inline]
148 fn from_bytes(bytes: Bytes) -> Self {
149 assert_eq!(bytes.len() % 8, 0);
155 if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
156 Self::Bytes(bytes)
157 } else {
158 let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
160 let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
161 alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
162 Self::Align(alloc)
163 }
164 }
165
166 #[inline]
167 fn length_in_bytes(&self) -> usize {
168 match self {
169 Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
170 Column::Bytes(b) => b.len(),
171 Column::Align(a) => 8 * a.len(),
172 }
173 }
174
175 #[inline]
176 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
177 match self {
178 Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
179 Column::Bytes(b) => writer.write_all(b).unwrap(),
180 Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
181 }
182 }
183}
184
185#[inline(always)]
189pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
190where
191 K: Columnar,
192 for<'a> Ref<'a, K>: Hash,
193 V: Columnar,
194 D: Columnar,
195 T: Columnar,
196{
197 k.hashed()
198}
199
200#[cfg(test)]
201mod tests {
202 use mz_ore::region::Region;
203 use timely::bytes::arc::BytesMut;
204 use timely::container::PushInto;
205 use timely::dataflow::channels::ContainerBytes;
206
207 use super::*;
208
209 fn raw_columnar_bytes() -> Vec<u8> {
211 let mut raw = Vec::new();
212 raw.extend(16_u64.to_le_bytes()); raw.extend(28_u64.to_le_bytes()); raw.extend(1_i32.to_le_bytes());
215 raw.extend(2_i32.to_le_bytes());
216 raw.extend(3_i32.to_le_bytes());
217 raw.extend([0, 0, 0, 0]); raw
219 }
220
221 #[mz_ore::test]
222 fn test_column_clone() {
223 let columns = Columnar::as_columns([1, 2, 3].iter());
224 let column_typed: Column<i32> = Column::Typed(columns);
225 let column_typed2 = column_typed.clone();
226
227 assert_eq!(
228 column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
229 vec![&1, &2, &3]
230 );
231
232 let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
233 let column_bytes: Column<i32> = Column::Bytes(bytes);
234 let column_bytes2 = column_bytes.clone();
235
236 assert_eq!(
237 column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
238 vec![&1, &2, &3]
239 );
240
241 let raw = raw_columnar_bytes();
242 let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
243 let region_bytes = bytemuck::cast_slice_mut(&mut region);
244 region_bytes[..raw.len()].copy_from_slice(&raw);
245 let column_align: Column<i32> = Column::Align(region);
246 let column_align2 = column_align.clone();
247
248 assert_eq!(
249 column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
250 vec![&1, &2, &3]
251 );
252 }
253
254 #[mz_ore::test]
257 fn test_column_known_bytes() {
258 let mut column: Column<i32> = Default::default();
259 column.push_into(1);
260 column.push_into(2);
261 column.push_into(3);
262 let mut data = Vec::new();
263 column.into_bytes(&mut std::io::Cursor::new(&mut data));
264 assert_eq!(data, raw_columnar_bytes());
265 }
266
267 #[mz_ore::test]
268 fn test_column_from_bytes() {
269 let raw = raw_columnar_bytes();
270
271 let buf = vec![0; raw.len() + 8];
272 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
273 let mut bytes_mut = BytesMut::from(buf);
274 let _ = bytes_mut.extract_to(align);
275 bytes_mut[..raw.len()].copy_from_slice(&raw);
276 let aligned_bytes = bytes_mut.extract_to(raw.len());
277
278 let column: Column<i32> = Column::from_bytes(aligned_bytes);
279 assert!(matches!(column, Column::Bytes(_)));
280 assert_eq!(
281 column.borrow().into_index_iter().collect::<Vec<_>>(),
282 vec![&1, &2, &3]
283 );
284
285 let buf = vec![0; raw.len() + 8];
286 let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
287 let mut bytes_mut = BytesMut::from(buf);
288 let _ = bytes_mut.extract_to(align + 1);
289 bytes_mut[..raw.len()].copy_from_slice(&raw);
290 let unaligned_bytes = bytes_mut.extract_to(raw.len());
291
292 let column: Column<i32> = Column::from_bytes(unaligned_bytes);
293 assert!(matches!(column, Column::Align(_)));
294 assert_eq!(
295 column.borrow().into_index_iter().collect::<Vec<_>>(),
296 vec![&1, &2, &3]
297 );
298 }
299}