differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87        // Finish
88        while let Some(chunk) = self.chunker.finish() {
89            let chunk = std::mem::take(chunk);
90            self.insert_chain(vec![chunk]);
91        }
92
93        // Merge all remaining chains into a single chain.
94        while self.chains.len() > 1 {
95            let list1 = self.chain_pop().unwrap();
96            let list2 = self.chain_pop().unwrap();
97            let merged = self.merge_by(list1, list2);
98            self.chain_push(merged);
99        }
100        let merged = self.chain_pop().unwrap_or_default();
101
102        // Extract readied data.
103        let mut kept = Vec::new();
104        let mut readied = Vec::new();
105        self.frontier.clear();
106
107        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109        if !kept.is_empty() {
110            self.chain_push(kept);
111        }
112
113        self.stash.clear();
114
115        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116        let seal = B::seal(&mut readied, description);
117        self.lower = upper;
118        seal
119    }
120
121    /// The frontier of elements remaining after the most recent call to `self.seal`.
122    #[inline]
123    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124        self.frontier.borrow()
125    }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
130    /// by decreasing length.
131    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132        if !chain.is_empty() {
133            self.chain_push(chain);
134            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135                let list1 = self.chain_pop().unwrap();
136                let list2 = self.chain_pop().unwrap();
137                let merged = self.merge_by(list1, list2);
138                self.chain_push(merged);
139            }
140        }
141    }
142
143    // merges two sorted input lists into one sorted output list.
144    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145        // TODO: `list1` and `list2` get dropped; would be better to reuse?
146        let mut output = Vec::with_capacity(list1.len() + list2.len());
147        self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149        output
150    }
151
152    /// Pop a chain and account size changes.
153    #[inline]
154    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155        let chain = self.chains.pop();
156        self.account(chain.iter().flatten().map(M::account), -1);
157        chain
158    }
159
160    /// Push a chain and account size changes.
161    #[inline]
162    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163        self.account(chain.iter().map(M::account), 1);
164        self.chains.push(chain);
165    }
166
167    /// Account size changes. Only performs work if a logger exists.
168    ///
169    /// Calculate the size based on the iterator passed along, with each attribute
170    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
171    #[inline]
172    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173        if let Some(logger) = &self.logger {
174            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175            for (records_, size_, capacity_, allocations_) in items {
176                records = records.saturating_add_unsigned(records_);
177                size = size.saturating_add_unsigned(size_);
178                capacity = capacity.saturating_add_unsigned(capacity_);
179                allocations = allocations.saturating_add_unsigned(allocations_);
180            }
181            logger.log(BatcherEvent {
182                operator: self.operator_id,
183                records_diff: records * diff,
184                size_diff: size * diff,
185                capacity_diff: capacity * diff,
186                allocations_diff: allocations * diff,
187            })
188        }
189    }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193    fn drop(&mut self) {
194        // Cleanup chain to retract accounting information.
195        while self.chain_pop().is_some() {}
196    }
197}
198
199/// A trait to describe interesting moments in a merge batcher.
200pub trait Merger: Default {
201    /// The internal representation of chunks of data.
202    type Chunk: Default;
203    /// The type of time in frontiers to extract updates.
204    type Time;
205    /// Merge chains into an output chain.
206    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207    /// Extract ready updates based on the `upper` frontier.
208    fn extract(
209        &mut self,
210        merged: Vec<Self::Chunk>,
211        upper: AntichainRef<Self::Time>,
212        frontier: &mut Antichain<Self::Time>,
213        readied: &mut Vec<Self::Chunk>,
214        kept: &mut Vec<Self::Chunk>,
215        stash: &mut Vec<Self::Chunk>,
216    );
217
218    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
219    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::{VecMerger, ColMerger};
223
224pub mod container {
225
226    //! A general purpose `Merger` implementation for arbitrary containers.
227    //!
228    //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
229    //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
230    //! well as the ability to return the container when iteration is complete.
231    //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container
232    //! items should be interpreted with respect to times, and with respect to differences.
233    //! These two traits exist instead of a stack of constraints on the structure of the associated items
234    //! of the containers, allowing them to perform their functions without destructuring their guts.
235    //!
236    //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
237
238    use std::cmp::Ordering;
239    use std::marker::PhantomData;
240    use timely::container::{PushInto, SizableContainer};
241    use timely::progress::frontier::{Antichain, AntichainRef};
242    use timely::{Accountable, Data, PartialOrder};
243    use timely::container::DrainContainer;
244    use crate::trace::implementations::merge_batcher::Merger;
245
246    /// An abstraction for a container that can be iterated over, and conclude by returning itself.
247    pub trait ContainerQueue<C: DrainContainer> {
248        /// Returns either the next item in the container, or the container itself.
249        fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
250        /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`.
251        fn is_empty(&self) -> bool;
252        /// Compare the heads of two queues, where empty queues come last.
253        fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
254        /// Create a new queue from an existing container.
255        fn from(container: C) -> Self;
256    }
257
258    /// Behavior to dissect items of chunks in the merge batcher
259    pub trait MergerChunk : Accountable + DrainContainer + SizableContainer + Default {
260        /// An owned time type.
261        ///
262        /// This type is provided so that users can maintain antichains of something, in order to track
263        /// the forward movement of time and extract intervals from chains of updates.
264        type TimeOwned;
265        /// The owned diff type.
266        ///
267        /// This type is provided so that users can provide an owned instance to the `push_and_add` method,
268        /// to act as a scratch space when the type is substantial and could otherwise require allocations.
269        type DiffOwned: Default;
270
271        /// Relates a borrowed time to antichains of owned times.
272        ///
273        /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
274        fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
275
276        /// Push an entry that adds together two diffs.
277        ///
278        /// This is only called when two items are deemed mergeable by the container queue.
279        /// If the two diffs added together is zero do not push anything.
280        fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
281
282        /// Account the allocations behind the chunk.
283        // TODO: Find a more universal home for this: `Container`?
284        fn account(&self) -> (usize, usize, usize, usize) {
285            let (size, capacity, allocations) = (0, 0, 0);
286            (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
287        }
288
289        /// Clear the chunk, to be reused.
290        fn clear(&mut self);
291    }
292
293    /// A merger for arbitrary containers.
294    ///
295    /// `MC` is a [`Container`] that implements [`MergerChunk`].
296    /// `CQ` is a [`ContainerQueue`] supporting `MC`.
297    pub struct ContainerMerger<MC, CQ> {
298        _marker: PhantomData<(MC, CQ)>,
299    }
300
301    impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
302        fn default() -> Self {
303            Self { _marker: PhantomData, }
304        }
305    }
306
307    impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
308        /// Helper to get pre-sized vector from the stash.
309        #[inline]
310        fn empty(&self, stash: &mut Vec<MC>) -> MC {
311            stash.pop().unwrap_or_else(|| {
312                let mut container = MC::default();
313                container.ensure_capacity(&mut None);
314                container
315            })
316        }
317        /// Helper to return a chunk to the stash.
318        #[inline]
319        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
320            // TODO: Should we only retain correctly sized containers?
321            chunk.clear();
322            stash.push(chunk);
323        }
324    }
325
326    impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
327    where
328        for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + PushInto<<MC as DrainContainer>::Item<'a>> + 'static,
329        CQ: ContainerQueue<MC>,
330    {
331        type Time = MC::TimeOwned;
332        type Chunk = MC;
333
334        // TODO: Consider integrating with `ConsolidateLayout`.
335        fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
336            let mut list1 = list1.into_iter();
337            let mut list2 = list2.into_iter();
338
339            let mut head1 = CQ::from(list1.next().unwrap_or_default());
340            let mut head2 = CQ::from(list2.next().unwrap_or_default());
341
342            let mut result = self.empty(stash);
343
344            let mut diff_owned = Default::default();
345
346            // while we have valid data in each input, merge.
347            while !head1.is_empty() && !head2.is_empty() {
348                while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
349                    let cmp = head1.cmp_heads(&head2);
350                    // TODO: The following less/greater branches could plausibly be a good moment for
351                    // `copy_range`, on account of runs of records that might benefit more from a
352                    // `memcpy`.
353                    match cmp {
354                        Ordering::Less => {
355                            result.push_into(head1.next_or_alloc().ok().unwrap());
356                        }
357                        Ordering::Greater => {
358                            result.push_into(head2.next_or_alloc().ok().unwrap());
359                        }
360                        Ordering::Equal => {
361                            let item1 = head1.next_or_alloc().ok().unwrap();
362                            let item2 = head2.next_or_alloc().ok().unwrap();
363                            result.push_and_add(item1, item2, &mut diff_owned);
364                       }
365                    }
366                }
367
368                if result.at_capacity() {
369                    output.push_into(result);
370                    result = self.empty(stash);
371                }
372
373                if head1.is_empty() {
374                    self.recycle(head1.next_or_alloc().err().unwrap(), stash);
375                    head1 = CQ::from(list1.next().unwrap_or_default());
376                }
377                if head2.is_empty() {
378                    self.recycle(head2.next_or_alloc().err().unwrap(), stash);
379                    head2 = CQ::from(list2.next().unwrap_or_default());
380                }
381            }
382
383            // TODO: recycle `head1` rather than discarding.
384            while let Ok(next) = head1.next_or_alloc() {
385                result.push_into(next);
386                if result.at_capacity() {
387                    output.push_into(result);
388                    result = self.empty(stash);
389                }
390            }
391            if !result.is_empty() {
392                output.push_into(result);
393                result = self.empty(stash);
394            }
395            output.extend(list1);
396
397            // TODO: recycle `head2` rather than discarding.
398            while let Ok(next) = head2.next_or_alloc() {
399                result.push_into(next);
400                if result.at_capacity() {
401                    output.push(result);
402                    result = self.empty(stash);
403                }
404            }
405            if !result.is_empty() {
406                output.push_into(result);
407                // result = self.empty(stash);
408            }
409            output.extend(list2);
410        }
411
412        fn extract(
413            &mut self,
414            merged: Vec<Self::Chunk>,
415            upper: AntichainRef<Self::Time>,
416            frontier: &mut Antichain<Self::Time>,
417            readied: &mut Vec<Self::Chunk>,
418            kept: &mut Vec<Self::Chunk>,
419            stash: &mut Vec<Self::Chunk>,
420        ) {
421            let mut keep = self.empty(stash);
422            let mut ready = self.empty(stash);
423
424            for mut buffer in merged {
425                for item in buffer.drain() {
426                    if MC::time_kept(&item, &upper, frontier) {
427                        if keep.at_capacity() && !keep.is_empty() {
428                            kept.push(keep);
429                            keep = self.empty(stash);
430                        }
431                        keep.push_into(item);
432                    } else {
433                        if ready.at_capacity() && !ready.is_empty() {
434                            readied.push(ready);
435                            ready = self.empty(stash);
436                        }
437                        ready.push_into(item);
438                    }
439                }
440                // Recycling buffer.
441                self.recycle(buffer, stash);
442            }
443            // Finish the kept data.
444            if !keep.is_empty() {
445                kept.push(keep);
446            }
447            if !ready.is_empty() {
448                readied.push(ready);
449            }
450        }
451
452        /// Account the allocations behind the chunk.
453        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
454            chunk.account()
455        }
456    }
457
458    pub use vec::VecMerger;
459    /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers.
460    pub mod vec {
461
462        use std::collections::VecDeque;
463        use timely::progress::{Antichain, frontier::AntichainRef};
464        use crate::difference::Semigroup;
465        use super::{ContainerQueue, MergerChunk};
466
467        /// A `Merger` implementation backed by vector containers.
468        pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
469
470        impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
471            fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
472                if self.is_empty() {
473                    Err(Vec::from(std::mem::take(self)))
474                }
475                else {
476                    Ok(self.pop_front().unwrap())
477                }
478            }
479            fn is_empty(&self) -> bool {
480                self.is_empty()
481            }
482            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
483                let (data1, time1, _) = self.front().unwrap();
484                let (data2, time2, _) = other.front().unwrap();
485                (data1, time1).cmp(&(data2, time2))
486            }
487            fn from(list: Vec<(D, T, R)>) -> Self {
488                <Self as From<_>>::from(list)
489            }
490        }
491
492        impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
493            type TimeOwned = T;
494            type DiffOwned = ();
495
496            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
497                if upper.less_equal(time) {
498                    frontier.insert_with(&time, |time| time.clone());
499                    true
500                }
501                else { false }
502            }
503            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
504                let (data, time, mut diff1) = item1;
505                let (_data, _time, diff2) = item2;
506                diff1.plus_equals(&diff2);
507                if !diff1.is_zero() {
508                    self.push((data, time, diff1));
509                }
510            }
511            fn account(&self) -> (usize, usize, usize, usize) {
512                let (size, capacity, allocations) = (0, 0, 0);
513                (self.len(), size, capacity, allocations)
514            }
515            #[inline] fn clear(&mut self) { Vec::clear(self) }
516        }
517    }
518
519    pub use columnation::ColMerger;
520    /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation).
521    pub mod columnation {
522
523        use timely::progress::{Antichain, frontier::AntichainRef};
524        use columnation::Columnation;
525
526        use crate::containers::TimelyStack;
527        use crate::difference::Semigroup;
528
529        use super::{ContainerQueue, MergerChunk};
530
531        /// A `Merger` implementation backed by `TimelyStack` containers (columnation).
532        pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
533
534        /// TODO
535        pub struct TimelyStackQueue<T: Columnation> {
536            list: TimelyStack<T>,
537            head: usize,
538        }
539
540        impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
541            fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
542                if self.is_empty() {
543                    Err(std::mem::take(&mut self.list))
544                }
545                else {
546                    Ok(self.pop())
547                }
548            }
549            fn is_empty(&self) -> bool {
550                self.head == self.list[..].len()
551            }
552            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
553                let (data1, time1, _) = self.peek();
554                let (data2, time2, _) = other.peek();
555                (data1, time1).cmp(&(data2, time2))
556            }
557            fn from(list: TimelyStack<(D, T, R)>) -> Self {
558                TimelyStackQueue { list, head: 0 }
559            }
560        }
561
562        impl<T: Columnation> TimelyStackQueue<T> {
563            fn pop(&mut self) -> &T {
564                self.head += 1;
565                &self.list[self.head - 1]
566            }
567
568            fn peek(&self) -> &T {
569                &self.list[self.head]
570            }
571        }
572
573        impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
574            type TimeOwned = T;
575            type DiffOwned = R;
576
577            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
578                if upper.less_equal(time) {
579                    frontier.insert_with(&time, |time| time.clone());
580                    true
581                }
582                else { false }
583            }
584            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
585                let (data, time, diff1) = item1;
586                let (_data, _time, diff2) = item2;
587                stash.clone_from(diff1);
588                stash.plus_equals(&diff2);
589                if !stash.is_zero() {
590                    self.copy_destructured(data, time, stash);
591                }
592            }
593            fn account(&self) -> (usize, usize, usize, usize) {
594                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
595                let cb = |siz, cap| {
596                    size += siz;
597                    capacity += cap;
598                    allocations += 1;
599                };
600                self.heap_size(cb);
601                (self.len(), size, capacity, allocations)
602            }
603            #[inline] fn clear(&mut self) { TimelyStack::clear(self) }
604        }
605    }
606}