1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::Container;
18use timely::container::{ContainerBuilder, PushInto};
19
20use crate::logging::{BatcherEvent, Logger};
21use crate::trace::{Batcher, Builder, Description};
22
23pub struct MergeBatcher<Input, C, M: Merger> {
28 chunker: C,
30 chains: Vec<Vec<M::Chunk>>,
34 stash: Vec<M::Chunk>,
36 merger: M,
38 lower: Antichain<M::Time>,
40 frontier: Antichain<M::Time>,
42 logger: Option<Logger>,
44 operator_id: usize,
46 _marker: PhantomData<Input>,
48}
49
50impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
51where
52 C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
53 M: Merger,
54 M::Time: Timestamp,
55{
56 type Input = Input;
57 type Time = M::Time;
58 type Output = M::Chunk;
59
60 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
61 Self {
62 logger,
63 operator_id,
64 chunker: C::default(),
65 merger: M::default(),
66 chains: Vec::new(),
67 stash: Vec::new(),
68 frontier: Antichain::new(),
69 lower: Antichain::from_elem(M::Time::minimum()),
70 _marker: PhantomData,
71 }
72 }
73
74 fn push_container(&mut self, container: &mut Input) {
77 self.chunker.push_into(container);
78 while let Some(chunk) = self.chunker.extract() {
79 let chunk = std::mem::take(chunk);
80 self.insert_chain(vec![chunk]);
81 }
82 }
83
84 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
89 while let Some(chunk) = self.chunker.finish() {
91 let chunk = std::mem::take(chunk);
92 self.insert_chain(vec![chunk]);
93 }
94
95 while self.chains.len() > 1 {
97 let list1 = self.chain_pop().unwrap();
98 let list2 = self.chain_pop().unwrap();
99 let merged = self.merge_by(list1, list2);
100 self.chain_push(merged);
101 }
102 let merged = self.chain_pop().unwrap_or_default();
103
104 let mut kept = Vec::new();
106 let mut readied = Vec::new();
107 self.frontier.clear();
108
109 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
110
111 if !kept.is_empty() {
112 self.chain_push(kept);
113 }
114
115 self.stash.clear();
116
117 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
118 let seal = B::seal(&mut readied, description);
119 self.lower = upper;
120 seal
121 }
122
123 #[inline]
125 fn frontier(&mut self) -> AntichainRef<M::Time> {
126 self.frontier.borrow()
127 }
128}
129
130impl<Input, C, M> MergeBatcher<Input, C, M>
131where
132 M: Merger,
133{
134 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
137 if !chain.is_empty() {
138 self.chain_push(chain);
139 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
140 let list1 = self.chain_pop().unwrap();
141 let list2 = self.chain_pop().unwrap();
142 let merged = self.merge_by(list1, list2);
143 self.chain_push(merged);
144 }
145 }
146 }
147
148 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
150 let mut output = Vec::with_capacity(list1.len() + list2.len());
152 self.merger.merge(list1, list2, &mut output, &mut self.stash);
153
154 output
155 }
156
157 #[inline]
159 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
160 let chain = self.chains.pop();
161 self.account(chain.iter().flatten().map(M::account), -1);
162 chain
163 }
164
165 #[inline]
167 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
168 self.account(chain.iter().map(M::account), 1);
169 self.chains.push(chain);
170 }
171
172 #[inline]
177 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
178 if let Some(logger) = &self.logger {
179 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
180 for (records_, size_, capacity_, allocations_) in items {
181 records = records.saturating_add_unsigned(records_);
182 size = size.saturating_add_unsigned(size_);
183 capacity = capacity.saturating_add_unsigned(capacity_);
184 allocations = allocations.saturating_add_unsigned(allocations_);
185 }
186 logger.log(BatcherEvent {
187 operator: self.operator_id,
188 records_diff: records * diff,
189 size_diff: size * diff,
190 capacity_diff: capacity * diff,
191 allocations_diff: allocations * diff,
192 })
193 }
194 }
195}
196
197impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
198where
199 M: Merger,
200{
201 fn drop(&mut self) {
202 while self.chain_pop().is_some() {}
204 }
205}
206
207pub trait Merger: Default {
209 type Chunk: Container;
211 type Time;
213 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
215 fn extract(
217 &mut self,
218 merged: Vec<Self::Chunk>,
219 upper: AntichainRef<Self::Time>,
220 frontier: &mut Antichain<Self::Time>,
221 readied: &mut Vec<Self::Chunk>,
222 kept: &mut Vec<Self::Chunk>,
223 stash: &mut Vec<Self::Chunk>,
224 );
225
226 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
228}
229
230pub use container::{VecMerger, ColMerger};
231
232pub mod container {
233
234 use std::cmp::Ordering;
247 use std::marker::PhantomData;
248 use timely::{Container, container::{PushInto, SizableContainer}};
249 use timely::progress::frontier::{Antichain, AntichainRef};
250 use timely::{Data, PartialOrder};
251
252 use crate::trace::implementations::merge_batcher::Merger;
253
254 pub trait ContainerQueue<C: Container> {
256 fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
258 fn is_empty(&self) -> bool;
260 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
262 fn from(container: C) -> Self;
264 }
265
266 pub trait MergerChunk : SizableContainer {
268 type TimeOwned;
273 type DiffOwned: Default;
278
279 fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
283
284 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
289
290 fn account(&self) -> (usize, usize, usize, usize) {
293 let (size, capacity, allocations) = (0, 0, 0);
294 (self.len(), size, capacity, allocations)
295 }
296 }
297
298 pub struct ContainerMerger<MC, CQ> {
303 _marker: PhantomData<(MC, CQ)>,
304 }
305
306 impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
307 fn default() -> Self {
308 Self { _marker: PhantomData, }
309 }
310 }
311
312 impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
313 #[inline]
315 fn empty(&self, stash: &mut Vec<MC>) -> MC {
316 stash.pop().unwrap_or_else(|| {
317 let mut container = MC::default();
318 container.ensure_capacity(&mut None);
319 container
320 })
321 }
322 #[inline]
324 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
325 chunk.clear();
327 stash.push(chunk);
328 }
329 }
330
331 impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
332 where
333 for<'a> MC: MergerChunk + Clone + PushInto<<MC as Container>::Item<'a>> + 'static,
334 for<'a> MC::TimeOwned: Ord + PartialOrder + Data,
335 CQ: ContainerQueue<MC>,
336 {
337 type Time = MC::TimeOwned;
338 type Chunk = MC;
339
340 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
342 let mut list1 = list1.into_iter();
343 let mut list2 = list2.into_iter();
344
345 let mut head1 = CQ::from(list1.next().unwrap_or_default());
346 let mut head2 = CQ::from(list2.next().unwrap_or_default());
347
348 let mut result = self.empty(stash);
349
350 let mut diff_owned = Default::default();
351
352 while !head1.is_empty() && !head2.is_empty() {
354 while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
355 let cmp = head1.cmp_heads(&head2);
356 match cmp {
360 Ordering::Less => {
361 result.push_into(head1.next_or_alloc().ok().unwrap());
362 }
363 Ordering::Greater => {
364 result.push_into(head2.next_or_alloc().ok().unwrap());
365 }
366 Ordering::Equal => {
367 let item1 = head1.next_or_alloc().ok().unwrap();
368 let item2 = head2.next_or_alloc().ok().unwrap();
369 result.push_and_add(item1, item2, &mut diff_owned);
370 }
371 }
372 }
373
374 if result.at_capacity() {
375 output.push_into(result);
376 result = self.empty(stash);
377 }
378
379 if head1.is_empty() {
380 self.recycle(head1.next_or_alloc().err().unwrap(), stash);
381 head1 = CQ::from(list1.next().unwrap_or_default());
382 }
383 if head2.is_empty() {
384 self.recycle(head2.next_or_alloc().err().unwrap(), stash);
385 head2 = CQ::from(list2.next().unwrap_or_default());
386 }
387 }
388
389 while let Ok(next) = head1.next_or_alloc() {
391 result.push_into(next);
392 if result.at_capacity() {
393 output.push_into(result);
394 result = self.empty(stash);
395 }
396 }
397 if !result.is_empty() {
398 output.push_into(result);
399 result = self.empty(stash);
400 }
401 output.extend(list1);
402
403 while let Ok(next) = head2.next_or_alloc() {
405 result.push_into(next);
406 if result.at_capacity() {
407 output.push(result);
408 result = self.empty(stash);
409 }
410 }
411 if !result.is_empty() {
412 output.push_into(result);
413 }
415 output.extend(list2);
416 }
417
418 fn extract(
419 &mut self,
420 merged: Vec<Self::Chunk>,
421 upper: AntichainRef<Self::Time>,
422 frontier: &mut Antichain<Self::Time>,
423 readied: &mut Vec<Self::Chunk>,
424 kept: &mut Vec<Self::Chunk>,
425 stash: &mut Vec<Self::Chunk>,
426 ) {
427 let mut keep = self.empty(stash);
428 let mut ready = self.empty(stash);
429
430 for mut buffer in merged {
431 for item in buffer.drain() {
432 if MC::time_kept(&item, &upper, frontier) {
433 if keep.at_capacity() && !keep.is_empty() {
434 kept.push(keep);
435 keep = self.empty(stash);
436 }
437 keep.push_into(item);
438 } else {
439 if ready.at_capacity() && !ready.is_empty() {
440 readied.push(ready);
441 ready = self.empty(stash);
442 }
443 ready.push_into(item);
444 }
445 }
446 self.recycle(buffer, stash);
448 }
449 if !keep.is_empty() {
451 kept.push(keep);
452 }
453 if !ready.is_empty() {
454 readied.push(ready);
455 }
456 }
457
458 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
460 chunk.account()
461 }
462 }
463
464 pub use vec::VecMerger;
465 pub mod vec {
467
468 use std::collections::VecDeque;
469 use timely::progress::{Antichain, frontier::AntichainRef};
470 use crate::difference::Semigroup;
471 use super::{ContainerQueue, MergerChunk};
472
473 pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
475
476 impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
477 fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
478 if self.is_empty() {
479 Err(Vec::from(std::mem::take(self)))
480 }
481 else {
482 Ok(self.pop_front().unwrap())
483 }
484 }
485 fn is_empty(&self) -> bool {
486 self.is_empty()
487 }
488 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
489 let (data1, time1, _) = self.front().unwrap();
490 let (data2, time2, _) = other.front().unwrap();
491 (data1, time1).cmp(&(data2, time2))
492 }
493 fn from(list: Vec<(D, T, R)>) -> Self {
494 <Self as From<_>>::from(list)
495 }
496 }
497
498 impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
499 type TimeOwned = T;
500 type DiffOwned = ();
501
502 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
503 if upper.less_equal(time) {
504 frontier.insert_with(&time, |time| time.clone());
505 true
506 }
507 else { false }
508 }
509 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
510 let (data, time, mut diff1) = item1;
511 let (_data, _time, diff2) = item2;
512 diff1.plus_equals(&diff2);
513 if !diff1.is_zero() {
514 self.push((data, time, diff1));
515 }
516 }
517 fn account(&self) -> (usize, usize, usize, usize) {
518 let (size, capacity, allocations) = (0, 0, 0);
519 (self.len(), size, capacity, allocations)
520 }
521 }
522 }
523
524 pub use columnation::ColMerger;
525 pub mod columnation {
527
528 use timely::progress::{Antichain, frontier::AntichainRef};
529 use columnation::Columnation;
530
531 use crate::containers::TimelyStack;
532 use crate::difference::Semigroup;
533
534 use super::{ContainerQueue, MergerChunk};
535
536 pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
538
539 pub struct TimelyStackQueue<T: Columnation> {
541 list: TimelyStack<T>,
542 head: usize,
543 }
544
545 impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
546 fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
547 if self.is_empty() {
548 Err(std::mem::take(&mut self.list))
549 }
550 else {
551 Ok(self.pop())
552 }
553 }
554 fn is_empty(&self) -> bool {
555 self.head == self.list[..].len()
556 }
557 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
558 let (data1, time1, _) = self.peek();
559 let (data2, time2, _) = other.peek();
560 (data1, time1).cmp(&(data2, time2))
561 }
562 fn from(list: TimelyStack<(D, T, R)>) -> Self {
563 TimelyStackQueue { list, head: 0 }
564 }
565 }
566
567 impl<T: Columnation> TimelyStackQueue<T> {
568 fn pop(&mut self) -> &T {
569 self.head += 1;
570 &self.list[self.head - 1]
571 }
572
573 fn peek(&self) -> &T {
574 &self.list[self.head]
575 }
576 }
577
578 impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
579 type TimeOwned = T;
580 type DiffOwned = R;
581
582 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
583 if upper.less_equal(time) {
584 frontier.insert_with(&time, |time| time.clone());
585 true
586 }
587 else { false }
588 }
589 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
590 let (data, time, diff1) = item1;
591 let (_data, _time, diff2) = item2;
592 stash.clone_from(diff1);
593 stash.plus_equals(&diff2);
594 if !stash.is_zero() {
595 self.copy_destructured(data, time, stash);
596 }
597 }
598 fn account(&self) -> (usize, usize, usize, usize) {
599 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
600 let cb = |siz, cap| {
601 size += siz;
602 capacity += cap;
603 allocations += 1;
604 };
605 self.heap_size(cb);
606 (self.len(), size, capacity, allocations)
607 }
608 }
609 }
610}