1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87 while let Some(chunk) = self.chunker.finish() {
89 let chunk = std::mem::take(chunk);
90 self.insert_chain(vec![chunk]);
91 }
92
93 while self.chains.len() > 1 {
95 let list1 = self.chain_pop().unwrap();
96 let list2 = self.chain_pop().unwrap();
97 let merged = self.merge_by(list1, list2);
98 self.chain_push(merged);
99 }
100 let merged = self.chain_pop().unwrap_or_default();
101
102 let mut kept = Vec::new();
104 let mut readied = Vec::new();
105 self.frontier.clear();
106
107 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109 if !kept.is_empty() {
110 self.chain_push(kept);
111 }
112
113 self.stash.clear();
114
115 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116 let seal = B::seal(&mut readied, description);
117 self.lower = upper;
118 seal
119 }
120
121 #[inline]
123 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124 self.frontier.borrow()
125 }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132 if !chain.is_empty() {
133 self.chain_push(chain);
134 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135 let list1 = self.chain_pop().unwrap();
136 let list2 = self.chain_pop().unwrap();
137 let merged = self.merge_by(list1, list2);
138 self.chain_push(merged);
139 }
140 }
141 }
142
143 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145 let mut output = Vec::with_capacity(list1.len() + list2.len());
147 self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149 output
150 }
151
152 #[inline]
154 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155 let chain = self.chains.pop();
156 self.account(chain.iter().flatten().map(M::account), -1);
157 chain
158 }
159
160 #[inline]
162 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163 self.account(chain.iter().map(M::account), 1);
164 self.chains.push(chain);
165 }
166
167 #[inline]
172 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173 if let Some(logger) = &self.logger {
174 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175 for (records_, size_, capacity_, allocations_) in items {
176 records = records.saturating_add_unsigned(records_);
177 size = size.saturating_add_unsigned(size_);
178 capacity = capacity.saturating_add_unsigned(capacity_);
179 allocations = allocations.saturating_add_unsigned(allocations_);
180 }
181 logger.log(BatcherEvent {
182 operator: self.operator_id,
183 records_diff: records * diff,
184 size_diff: size * diff,
185 capacity_diff: capacity * diff,
186 allocations_diff: allocations * diff,
187 })
188 }
189 }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193 fn drop(&mut self) {
194 while self.chain_pop().is_some() {}
196 }
197}
198
199pub trait Merger: Default {
201 type Chunk: Default;
203 type Time;
205 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207 fn extract(
209 &mut self,
210 merged: Vec<Self::Chunk>,
211 upper: AntichainRef<Self::Time>,
212 frontier: &mut Antichain<Self::Time>,
213 readied: &mut Vec<Self::Chunk>,
214 kept: &mut Vec<Self::Chunk>,
215 stash: &mut Vec<Self::Chunk>,
216 );
217
218 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::{VecMerger, ColMerger};
223
224pub mod container {
225
226 use std::cmp::Ordering;
239 use std::marker::PhantomData;
240 use timely::container::{PushInto, SizableContainer};
241 use timely::progress::frontier::{Antichain, AntichainRef};
242 use timely::{Accountable, Data, PartialOrder};
243 use timely::container::DrainContainer;
244 use crate::trace::implementations::merge_batcher::Merger;
245
246 pub trait ContainerQueue<C: DrainContainer> {
248 fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
250 fn is_empty(&self) -> bool;
252 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
254 fn from(container: C) -> Self;
256 }
257
258 pub trait MergerChunk : Accountable + DrainContainer + SizableContainer + Default {
260 type TimeOwned;
265 type DiffOwned: Default;
270
271 fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
275
276 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
281
282 fn account(&self) -> (usize, usize, usize, usize) {
285 let (size, capacity, allocations) = (0, 0, 0);
286 (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
287 }
288
289 fn clear(&mut self);
291 }
292
293 pub struct ContainerMerger<MC, CQ> {
298 _marker: PhantomData<(MC, CQ)>,
299 }
300
301 impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
302 fn default() -> Self {
303 Self { _marker: PhantomData, }
304 }
305 }
306
307 impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
308 #[inline]
310 fn empty(&self, stash: &mut Vec<MC>) -> MC {
311 stash.pop().unwrap_or_else(|| {
312 let mut container = MC::default();
313 container.ensure_capacity(&mut None);
314 container
315 })
316 }
317 #[inline]
319 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
320 chunk.clear();
322 stash.push(chunk);
323 }
324 }
325
326 impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
327 where
328 for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + PushInto<<MC as DrainContainer>::Item<'a>> + 'static,
329 CQ: ContainerQueue<MC>,
330 {
331 type Time = MC::TimeOwned;
332 type Chunk = MC;
333
334 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
336 let mut list1 = list1.into_iter();
337 let mut list2 = list2.into_iter();
338
339 let mut head1 = CQ::from(list1.next().unwrap_or_default());
340 let mut head2 = CQ::from(list2.next().unwrap_or_default());
341
342 let mut result = self.empty(stash);
343
344 let mut diff_owned = Default::default();
345
346 while !head1.is_empty() && !head2.is_empty() {
348 while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
349 let cmp = head1.cmp_heads(&head2);
350 match cmp {
354 Ordering::Less => {
355 result.push_into(head1.next_or_alloc().ok().unwrap());
356 }
357 Ordering::Greater => {
358 result.push_into(head2.next_or_alloc().ok().unwrap());
359 }
360 Ordering::Equal => {
361 let item1 = head1.next_or_alloc().ok().unwrap();
362 let item2 = head2.next_or_alloc().ok().unwrap();
363 result.push_and_add(item1, item2, &mut diff_owned);
364 }
365 }
366 }
367
368 if result.at_capacity() {
369 output.push_into(result);
370 result = self.empty(stash);
371 }
372
373 if head1.is_empty() {
374 self.recycle(head1.next_or_alloc().err().unwrap(), stash);
375 head1 = CQ::from(list1.next().unwrap_or_default());
376 }
377 if head2.is_empty() {
378 self.recycle(head2.next_or_alloc().err().unwrap(), stash);
379 head2 = CQ::from(list2.next().unwrap_or_default());
380 }
381 }
382
383 while let Ok(next) = head1.next_or_alloc() {
385 result.push_into(next);
386 if result.at_capacity() {
387 output.push_into(result);
388 result = self.empty(stash);
389 }
390 }
391 if !result.is_empty() {
392 output.push_into(result);
393 result = self.empty(stash);
394 }
395 output.extend(list1);
396
397 while let Ok(next) = head2.next_or_alloc() {
399 result.push_into(next);
400 if result.at_capacity() {
401 output.push(result);
402 result = self.empty(stash);
403 }
404 }
405 if !result.is_empty() {
406 output.push_into(result);
407 }
409 output.extend(list2);
410 }
411
412 fn extract(
413 &mut self,
414 merged: Vec<Self::Chunk>,
415 upper: AntichainRef<Self::Time>,
416 frontier: &mut Antichain<Self::Time>,
417 readied: &mut Vec<Self::Chunk>,
418 kept: &mut Vec<Self::Chunk>,
419 stash: &mut Vec<Self::Chunk>,
420 ) {
421 let mut keep = self.empty(stash);
422 let mut ready = self.empty(stash);
423
424 for mut buffer in merged {
425 for item in buffer.drain() {
426 if MC::time_kept(&item, &upper, frontier) {
427 if keep.at_capacity() && !keep.is_empty() {
428 kept.push(keep);
429 keep = self.empty(stash);
430 }
431 keep.push_into(item);
432 } else {
433 if ready.at_capacity() && !ready.is_empty() {
434 readied.push(ready);
435 ready = self.empty(stash);
436 }
437 ready.push_into(item);
438 }
439 }
440 self.recycle(buffer, stash);
442 }
443 if !keep.is_empty() {
445 kept.push(keep);
446 }
447 if !ready.is_empty() {
448 readied.push(ready);
449 }
450 }
451
452 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
454 chunk.account()
455 }
456 }
457
458 pub use vec::VecMerger;
459 pub mod vec {
461
462 use std::collections::VecDeque;
463 use timely::progress::{Antichain, frontier::AntichainRef};
464 use crate::difference::Semigroup;
465 use super::{ContainerQueue, MergerChunk};
466
467 pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
469
470 impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
471 fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
472 if self.is_empty() {
473 Err(Vec::from(std::mem::take(self)))
474 }
475 else {
476 Ok(self.pop_front().unwrap())
477 }
478 }
479 fn is_empty(&self) -> bool {
480 self.is_empty()
481 }
482 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
483 let (data1, time1, _) = self.front().unwrap();
484 let (data2, time2, _) = other.front().unwrap();
485 (data1, time1).cmp(&(data2, time2))
486 }
487 fn from(list: Vec<(D, T, R)>) -> Self {
488 <Self as From<_>>::from(list)
489 }
490 }
491
492 impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
493 type TimeOwned = T;
494 type DiffOwned = ();
495
496 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
497 if upper.less_equal(time) {
498 frontier.insert_with(&time, |time| time.clone());
499 true
500 }
501 else { false }
502 }
503 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
504 let (data, time, mut diff1) = item1;
505 let (_data, _time, diff2) = item2;
506 diff1.plus_equals(&diff2);
507 if !diff1.is_zero() {
508 self.push((data, time, diff1));
509 }
510 }
511 fn account(&self) -> (usize, usize, usize, usize) {
512 let (size, capacity, allocations) = (0, 0, 0);
513 (self.len(), size, capacity, allocations)
514 }
515 #[inline] fn clear(&mut self) { Vec::clear(self) }
516 }
517 }
518
519 pub use columnation::ColMerger;
520 pub mod columnation {
522
523 use timely::progress::{Antichain, frontier::AntichainRef};
524 use columnation::Columnation;
525
526 use crate::containers::TimelyStack;
527 use crate::difference::Semigroup;
528
529 use super::{ContainerQueue, MergerChunk};
530
531 pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
533
534 pub struct TimelyStackQueue<T: Columnation> {
536 list: TimelyStack<T>,
537 head: usize,
538 }
539
540 impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
541 fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
542 if self.is_empty() {
543 Err(std::mem::take(&mut self.list))
544 }
545 else {
546 Ok(self.pop())
547 }
548 }
549 fn is_empty(&self) -> bool {
550 self.head == self.list[..].len()
551 }
552 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
553 let (data1, time1, _) = self.peek();
554 let (data2, time2, _) = other.peek();
555 (data1, time1).cmp(&(data2, time2))
556 }
557 fn from(list: TimelyStack<(D, T, R)>) -> Self {
558 TimelyStackQueue { list, head: 0 }
559 }
560 }
561
562 impl<T: Columnation> TimelyStackQueue<T> {
563 fn pop(&mut self) -> &T {
564 self.head += 1;
565 &self.list[self.head - 1]
566 }
567
568 fn peek(&self) -> &T {
569 &self.list[self.head]
570 }
571 }
572
573 impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
574 type TimeOwned = T;
575 type DiffOwned = R;
576
577 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
578 if upper.less_equal(time) {
579 frontier.insert_with(&time, |time| time.clone());
580 true
581 }
582 else { false }
583 }
584 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
585 let (data, time, diff1) = item1;
586 let (_data, _time, diff2) = item2;
587 stash.clone_from(diff1);
588 stash.plus_equals(&diff2);
589 if !stash.is_zero() {
590 self.copy_destructured(data, time, stash);
591 }
592 }
593 fn account(&self) -> (usize, usize, usize, usize) {
594 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
595 let cb = |siz, cap| {
596 size += siz;
597 capacity += cap;
598 allocations += 1;
599 };
600 self.heap_size(cb);
601 (self.len(), size, capacity, allocations)
602 }
603 #[inline] fn clear(&mut self) { TimelyStack::clear(self) }
604 }
605 }
606}