1use std::collections::VecDeque;
23use std::iter::FromIterator;
24
25use columnation::{Columnation, Region};
26use differential_dataflow::consolidation::consolidate_updates;
27use differential_dataflow::difference::Semigroup;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
30use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
31use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
32use timely::progress::Timestamp;
33use timely::progress::frontier::{Antichain, AntichainRef};
34use timely::{Accountable, PartialOrder};
35
36pub struct ColumnationStack<T: Columnation> {
50 local: Vec<T>,
51 inner: T::InnerRegion,
52}
53
54impl<T: Columnation> ColumnationStack<T> {
55 pub fn with_capacity(capacity: usize) -> Self {
60 Self {
61 local: Vec::with_capacity(capacity),
62 inner: T::InnerRegion::default(),
63 }
64 }
65
66 #[inline(always)]
71 pub fn reserve_items<'a, I>(&mut self, items: I)
72 where
73 I: Iterator<Item = &'a T> + Clone,
74 T: 'a,
75 {
76 self.local.reserve(items.clone().count());
77 self.inner.reserve_items(items);
78 }
79
80 #[inline(always)]
85 pub fn reserve_regions<'a, I>(&mut self, regions: I)
86 where
87 Self: 'a,
88 I: Iterator<Item = &'a Self> + Clone,
89 {
90 self.local
91 .reserve(regions.clone().map(|cs| cs.local.len()).sum());
92 self.inner.reserve_regions(regions.map(|cs| &cs.inner));
93 }
94
95 pub fn copy(&mut self, item: &T) {
99 unsafe {
100 self.local.push(self.inner.copy(item));
101 }
102 }
103
104 pub fn clear(&mut self) {
106 unsafe {
107 self.local.set_len(0);
108 self.inner.clear();
109 }
110 }
111
112 pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
116 let mut write_position = index;
117 for position in index..self.local.len() {
118 if predicate(&self[position]) {
119 self.local.swap(position, write_position);
120 write_position += 1;
121 }
122 }
123 unsafe {
124 self.local.set_len(write_position);
127 }
128 }
129
130 pub unsafe fn local(&mut self) -> &mut [T] {
136 &mut self.local[..]
137 }
138
139 #[inline]
141 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
142 let size_of = std::mem::size_of::<T>();
143 callback(self.local.len() * size_of, self.local.capacity() * size_of);
144 self.inner.heap_size(callback);
145 }
146
147 #[inline]
149 pub fn summed_heap_size(&self) -> (usize, usize) {
150 let (mut length, mut capacity) = (0, 0);
151 self.heap_size(|len, cap| {
152 length += len;
153 capacity += cap
154 });
155 (length, capacity)
156 }
157
158 #[inline]
160 pub fn len(&self) -> usize {
161 self.local.len()
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.local.is_empty()
167 }
168
169 #[inline]
171 pub fn capacity(&self) -> usize {
172 self.local.capacity()
173 }
174
175 #[inline]
177 pub fn reserve(&mut self, additional: usize) {
178 self.local.reserve(additional)
179 }
180}
181
182impl<A: Columnation, B: Columnation> ColumnationStack<(A, B)> {
183 pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
185 unsafe {
186 self.local.push(self.inner.copy_destructured(t1, t2));
187 }
188 }
189}
190
191impl<A: Columnation, B: Columnation, C: Columnation> ColumnationStack<(A, B, C)> {
192 pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
194 unsafe {
195 self.local.push(self.inner.copy_destructured(r0, r1, r2));
196 }
197 }
198}
199
200impl<T: Columnation> std::ops::Deref for ColumnationStack<T> {
201 type Target = [T];
202 #[inline(always)]
203 fn deref(&self) -> &Self::Target {
204 &self.local[..]
205 }
206}
207
208impl<T: Columnation> Drop for ColumnationStack<T> {
209 fn drop(&mut self) {
210 self.clear();
211 }
212}
213
214impl<T: Columnation> Default for ColumnationStack<T> {
215 fn default() -> Self {
216 Self {
217 local: Vec::new(),
218 inner: T::InnerRegion::default(),
219 }
220 }
221}
222
223impl<'a, A: 'a + Columnation> FromIterator<&'a A> for ColumnationStack<A> {
224 fn from_iter<I: IntoIterator<Item = &'a A>>(iter: I) -> Self {
225 let iter = iter.into_iter();
226 let mut c = ColumnationStack::<A>::with_capacity(iter.size_hint().0);
227 for element in iter {
228 c.copy(element);
229 }
230 c
231 }
232}
233
234impl<T: Columnation + PartialEq> PartialEq for ColumnationStack<T> {
235 fn eq(&self, other: &Self) -> bool {
236 PartialEq::eq(&self[..], &other[..])
237 }
238}
239
240impl<T: Columnation + Eq> Eq for ColumnationStack<T> {}
241
242impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for ColumnationStack<T> {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 self[..].fmt(f)
245 }
246}
247
248impl<T: Columnation> Clone for ColumnationStack<T> {
249 fn clone(&self) -> Self {
250 let mut new: Self = Default::default();
251 for item in &self[..] {
252 new.copy(item);
253 }
254 new
255 }
256
257 fn clone_from(&mut self, source: &Self) {
258 self.clear();
259 for item in &source[..] {
260 self.copy(item);
261 }
262 }
263}
264
265impl<T: Columnation> PushInto<T> for ColumnationStack<T> {
266 #[inline]
267 fn push_into(&mut self, item: T) {
268 self.copy(&item);
269 }
270}
271
272impl<T: Columnation> PushInto<&T> for ColumnationStack<T> {
273 #[inline]
274 fn push_into(&mut self, item: &T) {
275 self.copy(item);
276 }
277}
278
279impl<T: Columnation> PushInto<&&T> for ColumnationStack<T> {
280 #[inline]
281 fn push_into(&mut self, item: &&T) {
282 self.copy(*item);
283 }
284}
285
286impl<T: Columnation> Accountable for ColumnationStack<T> {
289 #[inline]
290 fn record_count(&self) -> i64 {
291 i64::try_from(self.local.len()).unwrap()
292 }
293 #[inline]
294 fn is_empty(&self) -> bool {
295 self.local.is_empty()
296 }
297}
298
299impl<T: Columnation> DrainContainer for ColumnationStack<T> {
300 type Item<'a>
301 = &'a T
302 where
303 Self: 'a;
304 type DrainIter<'a>
305 = std::slice::Iter<'a, T>
306 where
307 Self: 'a;
308 #[inline]
309 fn drain(&mut self) -> Self::DrainIter<'_> {
310 (*self).iter()
311 }
312}
313
314impl<T: Columnation> SizableContainer for ColumnationStack<T> {
315 fn at_capacity(&self) -> bool {
316 self.len() == self.capacity()
317 }
318 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
319 if self.capacity() == 0 {
320 *self = stash.take().unwrap_or_default();
321 self.clear();
322 }
323 let preferred = timely::container::buffer::default_capacity::<T>();
324 if self.capacity() < preferred {
325 self.reserve(preferred - self.capacity());
326 }
327 }
328}
329
330impl<T: Clone + Ord + Columnation + 'static> BatchContainer for ColumnationStack<T> {
331 type Owned = T;
332 type ReadItem<'a> = &'a T;
333
334 #[inline(always)]
335 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
336 item.clone()
337 }
338 #[inline(always)]
339 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
340 other.clone_from(item);
341 }
342
343 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
344 item
345 }
346
347 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
348 self.push_into(item)
349 }
350 fn push_own(&mut self, item: &Self::Owned) {
351 self.push_into(item)
352 }
353
354 fn clear(&mut self) {
355 self.clear()
356 }
357
358 fn with_capacity(size: usize) -> Self {
359 Self::with_capacity(size)
360 }
361 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
362 let mut new = Self::default();
363 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
364 new
365 }
366 fn index(&self, index: usize) -> Self::ReadItem<'_> {
367 &self[index]
368 }
369 fn len(&self) -> usize {
370 self[..].len()
371 }
372}
373
374impl<K, V, T, R> BuilderInput<K, V> for ColumnationStack<((K::Owned, V::Owned), T, R)>
375where
376 K: for<'a> BatchContainer<
377 ReadItem<'a>: PartialEq<&'a K::Owned>,
378 Owned: Ord + Columnation + Clone + 'static,
379 >,
380 V: for<'a> BatchContainer<
381 ReadItem<'a>: PartialEq<&'a V::Owned>,
382 Owned: Ord + Columnation + Clone + 'static,
383 >,
384 T: Timestamp + Lattice + Columnation + Clone + 'static,
385 R: Ord + Clone + Semigroup + Columnation + 'static,
386{
387 type Key<'a> = &'a K::Owned;
388 type Val<'a> = &'a V::Owned;
389 type Time = T;
390 type Diff = R;
391
392 fn into_parts<'a>(
393 ((key, val), time, diff): Self::Item<'a>,
394 ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
395 (key, val, time.clone(), diff.clone())
396 }
397
398 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
399 K::reborrow(other) == *this
400 }
401
402 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
403 V::reborrow(other) == *this
404 }
405
406 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
407 let mut keys = 0;
408 let mut vals = 0;
409 let mut upds = 0;
410 let mut prev_keyval = None;
411 for link in chain.iter() {
412 for ((key, val), _, _) in link.iter() {
413 if let Some((p_key, p_val)) = prev_keyval {
414 if p_key != key {
415 keys += 1;
416 vals += 1;
417 } else if p_val != val {
418 vals += 1;
419 }
420 } else {
421 keys += 1;
422 vals += 1;
423 }
424 upds += 1;
425 prev_keyval = Some((key, val));
426 }
427 }
428 (keys, vals, upds)
429 }
430}
431
432pub struct ColumnationChunker<T: Columnation> {
443 pending: Vec<T>,
444 ready: VecDeque<ColumnationStack<T>>,
445 empty: Option<ColumnationStack<T>>,
446}
447
448impl<T: Columnation> Default for ColumnationChunker<T> {
449 fn default() -> Self {
450 Self {
451 pending: Vec::default(),
452 ready: VecDeque::default(),
453 empty: None,
454 }
455 }
456}
457
458impl<D, T, R> ColumnationChunker<(D, T, R)>
459where
460 D: Columnation + Ord,
461 T: Columnation + Ord,
462 R: Columnation + Semigroup,
463{
464 const BUFFER_SIZE_BYTES: usize = 64 << 10;
465
466 fn chunk_capacity() -> usize {
467 let size = std::mem::size_of::<(D, T, R)>();
468 if size == 0 {
469 Self::BUFFER_SIZE_BYTES
470 } else if size <= Self::BUFFER_SIZE_BYTES {
471 Self::BUFFER_SIZE_BYTES / size
472 } else {
473 1
474 }
475 }
476
477 fn form_chunk(&mut self) {
478 consolidate_updates(&mut self.pending);
479 if self.pending.len() >= Self::chunk_capacity() {
480 while self.pending.len() > Self::chunk_capacity() {
481 let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
482 for item in self.pending.drain(..chunk.capacity()) {
483 chunk.copy(&item);
484 }
485 self.ready.push_back(chunk);
486 }
487 }
488 }
489}
490
491impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
492where
493 D: Columnation + Ord + Clone,
494 T: Columnation + Ord + Clone,
495 R: Columnation + Semigroup + Clone,
496{
497 fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
498 if self.pending.capacity() < Self::chunk_capacity() * 2 {
499 self.pending
500 .reserve(Self::chunk_capacity() * 2 - self.pending.len());
501 }
502
503 let mut drain = container.drain(..).peekable();
504 while drain.peek().is_some() {
505 self.pending
506 .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
507 if self.pending.len() == self.pending.capacity() {
508 self.form_chunk();
509 }
510 }
511 }
512}
513
514impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
515where
516 D: Columnation + Ord + Clone + 'static,
517 T: Columnation + Ord + Clone + 'static,
518 R: Columnation + Semigroup + Clone + 'static,
519{
520 type Container = ColumnationStack<(D, T, R)>;
521
522 fn extract(&mut self) -> Option<&mut Self::Container> {
523 if let Some(ready) = self.ready.pop_front() {
524 self.empty = Some(ready);
525 self.empty.as_mut()
526 } else {
527 None
528 }
529 }
530
531 fn finish(&mut self) -> Option<&mut Self::Container> {
532 consolidate_updates(&mut self.pending);
533 while !self.pending.is_empty() {
534 let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
535 for item in self
536 .pending
537 .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
538 {
539 chunk.copy(&item);
540 }
541 self.ready.push_back(chunk);
542 }
543 self.empty = self.ready.pop_front();
544 self.empty.as_mut()
545 }
546}
547
548impl<D, T, R> InternalMerge for ColumnationStack<(D, T, R)>
553where
554 D: Ord + Columnation + Clone + 'static,
555 T: Ord + Columnation + Clone + PartialOrder + 'static,
556 R: Default + Semigroup + Columnation + Clone + 'static,
557{
558 type TimeOwned = T;
559
560 fn len(&self) -> usize {
561 self[..].len()
562 }
563
564 fn clear(&mut self) {
565 ColumnationStack::clear(self)
566 }
567
568 fn account(&self) -> (usize, usize, usize, usize) {
569 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
570 let cb = |siz, cap| {
571 size += siz;
572 capacity += cap;
573 allocations += 1;
574 };
575 self.heap_size(cb);
576 (self.len(), size, capacity, allocations)
577 }
578
579 fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) {
580 use std::cmp::Ordering;
581 match others.len() {
582 0 => {}
583 1 => {
584 let other = &mut others[0];
585 let pos = &mut positions[0];
586 if self[..].is_empty() && *pos == 0 {
587 std::mem::swap(self, other);
588 return;
589 }
590 for i in *pos..other[..].len() {
591 self.copy(&other[i]);
592 }
593 *pos = other[..].len();
594 }
595 2 => {
596 let (left, right) = others.split_at_mut(1);
597 let other1 = &left[0];
598 let other2 = &right[0];
599
600 let mut stash = R::default();
601
602 while positions[0] < other1[..].len()
603 && positions[1] < other2[..].len()
604 && !self.at_capacity()
605 {
606 let (d1, t1, _) = &other1[positions[0]];
607 let (d2, t2, _) = &other2[positions[1]];
608 match (d1, t1).cmp(&(d2, t2)) {
609 Ordering::Less => {
610 self.copy(&other1[positions[0]]);
611 positions[0] += 1;
612 }
613 Ordering::Greater => {
614 self.copy(&other2[positions[1]]);
615 positions[1] += 1;
616 }
617 Ordering::Equal => {
618 let (_, _, r1) = &other1[positions[0]];
619 let (_, _, r2) = &other2[positions[1]];
620 stash.clone_from(r1);
621 stash.plus_equals(r2);
622 if !stash.is_zero() {
623 let (d, t, _) = &other1[positions[0]];
624 self.copy_destructured(d, t, &stash);
625 }
626 positions[0] += 1;
627 positions[1] += 1;
628 }
629 }
630 }
631 }
632 n => unimplemented!("{n}-way merge not yet supported"),
633 }
634 }
635
636 fn extract(
637 &mut self,
638 position: &mut usize,
639 upper: AntichainRef<T>,
640 frontier: &mut Antichain<T>,
641 keep: &mut Self,
642 ship: &mut Self,
643 ) {
644 let len = self[..].len();
645 while *position < len && !keep.at_capacity() && !ship.at_capacity() {
646 let (data, time, diff) = &self[*position];
647 if upper.less_equal(time) {
648 frontier.insert_with(time, |time| time.clone());
649 keep.copy_destructured(data, time, diff);
650 } else {
651 ship.copy_destructured(data, time, diff);
652 }
653 *position += 1;
654 }
655 }
656}
657
658pub type ColInternalMerger<D, T, R> =
664 differential_dataflow::trace::implementations::merge_batcher::InternalMerger<
665 ColumnationStack<(D, T, R)>,
666 >;