1use std::collections::VecDeque;
23use std::iter::FromIterator;
24
25use columnation::{Columnation, Region};
26use differential_dataflow::consolidation::consolidate_updates;
27use differential_dataflow::difference::Semigroup;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::implementations::merge_batcher::Merger;
30use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
31use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
32use timely::progress::Timestamp;
33use timely::progress::frontier::{Antichain, AntichainRef};
34use timely::{Accountable, PartialOrder};
35
36pub struct ColumnationStack<T: Columnation> {
50 local: Vec<T>,
51 inner: T::InnerRegion,
52}
53
54impl<T: Columnation> ColumnationStack<T> {
55 pub fn with_capacity(capacity: usize) -> Self {
60 Self {
61 local: Vec::with_capacity(capacity),
62 inner: T::InnerRegion::default(),
63 }
64 }
65
66 #[inline(always)]
71 pub fn reserve_items<'a, I>(&mut self, items: I)
72 where
73 I: Iterator<Item = &'a T> + Clone,
74 T: 'a,
75 {
76 self.local.reserve(items.clone().count());
77 self.inner.reserve_items(items);
78 }
79
80 #[inline(always)]
85 pub fn reserve_regions<'a, I>(&mut self, regions: I)
86 where
87 Self: 'a,
88 I: Iterator<Item = &'a Self> + Clone,
89 {
90 self.local
91 .reserve(regions.clone().map(|cs| cs.local.len()).sum());
92 self.inner.reserve_regions(regions.map(|cs| &cs.inner));
93 }
94
95 pub fn copy(&mut self, item: &T) {
99 unsafe {
100 self.local.push(self.inner.copy(item));
101 }
102 }
103
104 pub fn clear(&mut self) {
106 unsafe {
107 self.local.set_len(0);
108 self.inner.clear();
109 }
110 }
111
112 pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
116 let mut write_position = index;
117 for position in index..self.local.len() {
118 if predicate(&self[position]) {
119 self.local.swap(position, write_position);
120 write_position += 1;
121 }
122 }
123 unsafe {
124 self.local.set_len(write_position);
127 }
128 }
129
130 pub unsafe fn local(&mut self) -> &mut [T] {
136 &mut self.local[..]
137 }
138
139 #[inline]
141 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
142 let size_of = std::mem::size_of::<T>();
143 callback(self.local.len() * size_of, self.local.capacity() * size_of);
144 self.inner.heap_size(callback);
145 }
146
147 #[inline]
149 pub fn summed_heap_size(&self) -> (usize, usize) {
150 let (mut length, mut capacity) = (0, 0);
151 self.heap_size(|len, cap| {
152 length += len;
153 capacity += cap
154 });
155 (length, capacity)
156 }
157
158 #[inline]
160 pub fn len(&self) -> usize {
161 self.local.len()
162 }
163
164 pub fn is_empty(&self) -> bool {
166 self.local.is_empty()
167 }
168
169 #[inline]
171 pub fn capacity(&self) -> usize {
172 self.local.capacity()
173 }
174
175 #[inline]
177 pub fn reserve(&mut self, additional: usize) {
178 self.local.reserve(additional)
179 }
180}
181
182impl<A: Columnation, B: Columnation> ColumnationStack<(A, B)> {
183 pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
185 unsafe {
186 self.local.push(self.inner.copy_destructured(t1, t2));
187 }
188 }
189}
190
191impl<A: Columnation, B: Columnation, C: Columnation> ColumnationStack<(A, B, C)> {
192 pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
194 unsafe {
195 self.local.push(self.inner.copy_destructured(r0, r1, r2));
196 }
197 }
198}
199
200impl<T: Columnation> std::ops::Deref for ColumnationStack<T> {
201 type Target = [T];
202 #[inline(always)]
203 fn deref(&self) -> &Self::Target {
204 &self.local[..]
205 }
206}
207
208impl<T: Columnation> Drop for ColumnationStack<T> {
209 fn drop(&mut self) {
210 self.clear();
211 }
212}
213
214impl<T: Columnation> Default for ColumnationStack<T> {
215 fn default() -> Self {
216 Self {
217 local: Vec::new(),
218 inner: T::InnerRegion::default(),
219 }
220 }
221}
222
223impl<'a, A: 'a + Columnation> FromIterator<&'a A> for ColumnationStack<A> {
224 fn from_iter<I: IntoIterator<Item = &'a A>>(iter: I) -> Self {
225 let iter = iter.into_iter();
226 let mut c = ColumnationStack::<A>::with_capacity(iter.size_hint().0);
227 for element in iter {
228 c.copy(element);
229 }
230 c
231 }
232}
233
234impl<T: Columnation + PartialEq> PartialEq for ColumnationStack<T> {
235 fn eq(&self, other: &Self) -> bool {
236 PartialEq::eq(&self[..], &other[..])
237 }
238}
239
240impl<T: Columnation + Eq> Eq for ColumnationStack<T> {}
241
242impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for ColumnationStack<T> {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 self[..].fmt(f)
245 }
246}
247
248impl<T: Columnation> Clone for ColumnationStack<T> {
249 fn clone(&self) -> Self {
250 let mut new: Self = Default::default();
251 for item in &self[..] {
252 new.copy(item);
253 }
254 new
255 }
256
257 fn clone_from(&mut self, source: &Self) {
258 self.clear();
259 for item in &source[..] {
260 self.copy(item);
261 }
262 }
263}
264
265impl<T: Columnation> PushInto<T> for ColumnationStack<T> {
266 #[inline]
267 fn push_into(&mut self, item: T) {
268 self.copy(&item);
269 }
270}
271
272impl<T: Columnation> PushInto<&T> for ColumnationStack<T> {
273 #[inline]
274 fn push_into(&mut self, item: &T) {
275 self.copy(item);
276 }
277}
278
279impl<T: Columnation> PushInto<&&T> for ColumnationStack<T> {
280 #[inline]
281 fn push_into(&mut self, item: &&T) {
282 self.copy(*item);
283 }
284}
285
286impl<T: Columnation> Accountable for ColumnationStack<T> {
289 #[inline]
290 fn record_count(&self) -> i64 {
291 i64::try_from(self.local.len()).unwrap()
292 }
293 #[inline]
294 fn is_empty(&self) -> bool {
295 self.local.is_empty()
296 }
297}
298
299impl<T: Columnation> DrainContainer for ColumnationStack<T> {
300 type Item<'a>
301 = &'a T
302 where
303 Self: 'a;
304 type DrainIter<'a>
305 = std::slice::Iter<'a, T>
306 where
307 Self: 'a;
308 #[inline]
309 fn drain(&mut self) -> Self::DrainIter<'_> {
310 (*self).iter()
311 }
312}
313
314impl<T: Columnation> SizableContainer for ColumnationStack<T> {
315 fn at_capacity(&self) -> bool {
316 self.len() == self.capacity()
317 }
318 fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
319 if self.capacity() == 0 {
320 *self = stash.take().unwrap_or_default();
321 self.clear();
322 }
323 let preferred = timely::container::buffer::default_capacity::<T>();
324 if self.capacity() < preferred {
325 self.reserve(preferred - self.capacity());
326 }
327 }
328}
329
330impl<T: Clone + Ord + Columnation + 'static> BatchContainer for ColumnationStack<T> {
331 type Owned = T;
332 type ReadItem<'a> = &'a T;
333
334 #[inline(always)]
335 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
336 item.clone()
337 }
338 #[inline(always)]
339 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
340 other.clone_from(item);
341 }
342
343 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
344 item
345 }
346
347 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
348 self.push_into(item)
349 }
350 fn push_own(&mut self, item: &Self::Owned) {
351 self.push_into(item)
352 }
353
354 fn clear(&mut self) {
355 self.clear()
356 }
357
358 fn with_capacity(size: usize) -> Self {
359 Self::with_capacity(size)
360 }
361 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
362 let mut new = Self::default();
363 new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
364 new
365 }
366 fn index(&self, index: usize) -> Self::ReadItem<'_> {
367 &self[index]
368 }
369 fn len(&self) -> usize {
370 self[..].len()
371 }
372}
373
374impl<K, V, T, R> BuilderInput<K, V> for ColumnationStack<((K::Owned, V::Owned), T, R)>
375where
376 K: for<'a> BatchContainer<
377 ReadItem<'a>: PartialEq<&'a K::Owned>,
378 Owned: Ord + Columnation + Clone + 'static,
379 >,
380 V: for<'a> BatchContainer<
381 ReadItem<'a>: PartialEq<&'a V::Owned>,
382 Owned: Ord + Columnation + Clone + 'static,
383 >,
384 T: Timestamp + Lattice + Columnation + Clone + 'static,
385 R: Ord + Clone + Semigroup + Columnation + 'static,
386{
387 type Key<'a> = &'a K::Owned;
388 type Val<'a> = &'a V::Owned;
389 type Time = T;
390 type Diff = R;
391
392 fn into_parts<'a>(
393 ((key, val), time, diff): Self::Item<'a>,
394 ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
395 (key, val, time.clone(), diff.clone())
396 }
397
398 fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
399 K::reborrow(other) == *this
400 }
401
402 fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
403 V::reborrow(other) == *this
404 }
405
406 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
407 let mut keys = 0;
408 let mut vals = 0;
409 let mut upds = 0;
410 let mut prev_keyval = None;
411 for link in chain.iter() {
412 for ((key, val), _, _) in link.iter() {
413 if let Some((p_key, p_val)) = prev_keyval {
414 if p_key != key {
415 keys += 1;
416 vals += 1;
417 } else if p_val != val {
418 vals += 1;
419 }
420 } else {
421 keys += 1;
422 vals += 1;
423 }
424 upds += 1;
425 prev_keyval = Some((key, val));
426 }
427 }
428 (keys, vals, upds)
429 }
430}
431
432pub struct ColumnationChunker<T: Columnation> {
443 pending: Vec<T>,
444 ready: VecDeque<ColumnationStack<T>>,
445 empty: Option<ColumnationStack<T>>,
446}
447
448impl<T: Columnation> Default for ColumnationChunker<T> {
449 fn default() -> Self {
450 Self {
451 pending: Vec::default(),
452 ready: VecDeque::default(),
453 empty: None,
454 }
455 }
456}
457
458impl<D, T, R> ColumnationChunker<(D, T, R)>
459where
460 D: Columnation + Ord,
461 T: Columnation + Ord,
462 R: Columnation + Semigroup,
463{
464 const BUFFER_SIZE_BYTES: usize = 64 << 10;
465
466 fn chunk_capacity() -> usize {
467 let size = std::mem::size_of::<(D, T, R)>();
468 if size == 0 {
469 Self::BUFFER_SIZE_BYTES
470 } else if size <= Self::BUFFER_SIZE_BYTES {
471 Self::BUFFER_SIZE_BYTES / size
472 } else {
473 1
474 }
475 }
476
477 fn form_chunk(&mut self) {
478 consolidate_updates(&mut self.pending);
479 if self.pending.len() >= Self::chunk_capacity() {
480 while self.pending.len() > Self::chunk_capacity() {
481 let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
482 for item in self.pending.drain(..chunk.capacity()) {
483 chunk.copy(&item);
484 }
485 self.ready.push_back(chunk);
486 }
487 }
488 }
489}
490
491impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
492where
493 D: Columnation + Ord + Clone,
494 T: Columnation + Ord + Clone,
495 R: Columnation + Semigroup + Clone,
496{
497 fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
498 if self.pending.capacity() < Self::chunk_capacity() * 2 {
499 self.pending
500 .reserve(Self::chunk_capacity() * 2 - self.pending.len());
501 }
502
503 let mut drain = container.drain(..).peekable();
504 while drain.peek().is_some() {
505 self.pending
506 .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
507 if self.pending.len() == self.pending.capacity() {
508 self.form_chunk();
509 }
510 }
511 }
512}
513
514impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
515where
516 D: Columnation + Ord + Clone + 'static,
517 T: Columnation + Ord + Clone + 'static,
518 R: Columnation + Semigroup + Clone + 'static,
519{
520 type Container = ColumnationStack<(D, T, R)>;
521
522 fn extract(&mut self) -> Option<&mut Self::Container> {
523 if let Some(ready) = self.ready.pop_front() {
524 self.empty = Some(ready);
525 self.empty.as_mut()
526 } else {
527 None
528 }
529 }
530
531 fn finish(&mut self) -> Option<&mut Self::Container> {
532 consolidate_updates(&mut self.pending);
533 while !self.pending.is_empty() {
534 let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
535 for item in self
536 .pending
537 .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
538 {
539 chunk.copy(&item);
540 }
541 self.ready.push_back(chunk);
542 }
543 self.empty = self.ready.pop_front();
544 self.empty.as_mut()
545 }
546}
547
548pub struct ColInternalMerger<D, T, R> {
560 _marker: std::marker::PhantomData<(D, T, R)>,
561}
562
563impl<D, T, R> Default for ColInternalMerger<D, T, R> {
564 fn default() -> Self {
565 Self {
566 _marker: std::marker::PhantomData,
567 }
568 }
569}
570
571impl<D, T, R> ColInternalMerger<D, T, R>
572where
573 D: Ord + Columnation + Clone + 'static,
574 T: Ord + Columnation + Clone + PartialOrder + 'static,
575 R: Default + Semigroup + Columnation + Clone + 'static,
576{
577 fn chunk_capacity() -> usize {
579 const BUFFER_SIZE_BYTES: usize = 64 << 10;
580 let size = std::mem::size_of::<(D, T, R)>();
581 if size == 0 {
582 BUFFER_SIZE_BYTES
583 } else if size <= BUFFER_SIZE_BYTES {
584 BUFFER_SIZE_BYTES / size
585 } else {
586 1
587 }
588 }
589
590 fn empty(stash: &mut Vec<ColumnationStack<(D, T, R)>>) -> ColumnationStack<(D, T, R)> {
592 let target = Self::chunk_capacity();
593 match stash.pop() {
594 Some(mut chunk) if chunk.capacity() >= target => {
595 chunk.clear();
596 chunk
597 }
598 _ => ColumnationStack::with_capacity(target),
599 }
600 }
601
602 fn recycle(
604 mut chunk: ColumnationStack<(D, T, R)>,
605 stash: &mut Vec<ColumnationStack<(D, T, R)>>,
606 ) {
607 chunk.clear();
608 stash.push(chunk);
609 }
610
611 fn drain_side(
617 head: &mut ColumnationStack<(D, T, R)>,
618 pos: &mut usize,
619 list: &mut std::vec::IntoIter<ColumnationStack<(D, T, R)>>,
620 result: &mut ColumnationStack<(D, T, R)>,
621 output: &mut Vec<ColumnationStack<(D, T, R)>>,
622 stash: &mut Vec<ColumnationStack<(D, T, R)>>,
623 ) {
624 if *pos < head[..].len() {
626 if result.is_empty() && *pos == 0 {
627 std::mem::swap(result, head);
628 } else {
629 for i in *pos..head[..].len() {
630 result.copy(&head[i]);
631 }
632 }
633 *pos = head[..].len();
634 }
635 if !result.is_empty() {
637 output.push(std::mem::replace(result, Self::empty(stash)));
638 }
639 output.extend(list);
641 }
642}
643
644impl<D, T, R> Merger for ColInternalMerger<D, T, R>
645where
646 D: Ord + Columnation + Clone + 'static,
647 T: Ord + Columnation + Clone + PartialOrder + 'static,
648 R: Default + Semigroup + Columnation + Clone + 'static,
649{
650 type Chunk = ColumnationStack<(D, T, R)>;
651 type Time = T;
652
653 fn merge(
654 &mut self,
655 list1: Vec<Self::Chunk>,
656 list2: Vec<Self::Chunk>,
657 output: &mut Vec<Self::Chunk>,
658 stash: &mut Vec<Self::Chunk>,
659 ) {
660 use std::cmp::Ordering;
661
662 let mut list1 = list1.into_iter();
663 let mut list2 = list2.into_iter();
664 let mut head1 = list1.next().unwrap_or_default();
665 let mut head2 = list2.next().unwrap_or_default();
666 let mut pos1 = 0;
667 let mut pos2 = 0;
668 let mut result = Self::empty(stash);
669 let mut diff = R::default();
670
671 while pos1 < head1[..].len() && pos2 < head2[..].len() {
673 while pos1 < head1[..].len() && pos2 < head2[..].len() && !result.at_capacity() {
675 let (d1, t1, r1) = &head1[pos1];
676 let (d2, t2, r2) = &head2[pos2];
677 match (d1, t1).cmp(&(d2, t2)) {
678 Ordering::Less => {
679 result.copy(&head1[pos1]);
680 pos1 += 1;
681 }
682 Ordering::Greater => {
683 result.copy(&head2[pos2]);
684 pos2 += 1;
685 }
686 Ordering::Equal => {
687 diff.clone_from(r1);
688 diff.plus_equals(r2);
689 if !diff.is_zero() {
690 result.copy_destructured(d1, t1, &diff);
691 }
692 pos1 += 1;
693 pos2 += 1;
694 }
695 }
696 }
697 if result.at_capacity() {
698 output.push(std::mem::replace(&mut result, Self::empty(stash)));
699 }
700 if pos1 >= head1[..].len() {
701 let old = std::mem::replace(&mut head1, list1.next().unwrap_or_default());
702 Self::recycle(old, stash);
703 pos1 = 0;
704 }
705 if pos2 >= head2[..].len() {
706 let old = std::mem::replace(&mut head2, list2.next().unwrap_or_default());
707 Self::recycle(old, stash);
708 pos2 = 0;
709 }
710 }
711
712 Self::drain_side(
716 &mut head1,
717 &mut pos1,
718 &mut list1,
719 &mut result,
720 output,
721 stash,
722 );
723 Self::drain_side(
724 &mut head2,
725 &mut pos2,
726 &mut list2,
727 &mut result,
728 output,
729 stash,
730 );
731
732 if !result.is_empty() {
733 output.push(result);
734 }
735 }
736
737 fn extract(
738 &mut self,
739 merged: Vec<Self::Chunk>,
740 upper: AntichainRef<T>,
741 frontier: &mut Antichain<T>,
742 readied: &mut Vec<Self::Chunk>,
743 kept: &mut Vec<Self::Chunk>,
744 stash: &mut Vec<Self::Chunk>,
745 ) {
746 let mut keep = Self::empty(stash);
747 let mut ready = Self::empty(stash);
748
749 for chunk in merged {
750 let len = chunk[..].len();
751 for i in 0..len {
752 let (data, time, diff) = &chunk[i];
753 if upper.less_equal(time) {
754 frontier.insert_with(time, |time| time.clone());
755 keep.copy_destructured(data, time, diff);
756 } else {
757 ready.copy_destructured(data, time, diff);
758 }
759 if keep.at_capacity() {
760 kept.push(std::mem::replace(&mut keep, Self::empty(stash)));
761 }
762 if ready.at_capacity() {
763 readied.push(std::mem::replace(&mut ready, Self::empty(stash)));
764 }
765 }
766 Self::recycle(chunk, stash);
768 }
769
770 if !keep.is_empty() {
771 kept.push(keep);
772 }
773 if !ready.is_empty() {
774 readied.push(ready);
775 }
776 }
777
778 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
779 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
780 chunk.heap_size(|siz, cap| {
781 size += siz;
782 capacity += cap;
783 allocations += 1;
784 });
785 (chunk[..].len(), size, capacity, allocations)
786 }
787}