differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::Container;
18use timely::container::{ContainerBuilder, PushInto};
19
20use crate::logging::{BatcherEvent, Logger};
21use crate::trace::{Batcher, Builder, Description};
22
23/// Creates batches from containers of unordered tuples.
24///
25/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
26/// and must produce outputs of type `M::Chunk`.
27pub struct MergeBatcher<Input, C, M: Merger> {
28    /// Transforms input streams to chunks of sorted, consolidated data.
29    chunker: C,
30    /// A sequence of power-of-two length lists of sorted, consolidated containers.
31    ///
32    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
33    chains: Vec<Vec<M::Chunk>>,
34    /// Stash of empty chunks, recycled through the merging process.
35    stash: Vec<M::Chunk>,
36    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
37    merger: M,
38    /// Current lower frontier, we sealed up to here.
39    lower: Antichain<M::Time>,
40    /// The lower-bound frontier of the data, after the last call to seal.
41    frontier: Antichain<M::Time>,
42    /// Logger for size accounting.
43    logger: Option<Logger>,
44    /// Timely operator ID.
45    operator_id: usize,
46    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
47    _marker: PhantomData<Input>,
48}
49
50impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
51where
52    C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
53    M: Merger,
54    M::Time: Timestamp,
55{
56    type Input = Input;
57    type Time = M::Time;
58    type Output = M::Chunk;
59
60    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
61        Self {
62            logger,
63            operator_id,
64            chunker: C::default(),
65            merger: M::default(),
66            chains: Vec::new(),
67            stash: Vec::new(),
68            frontier: Antichain::new(),
69            lower: Antichain::from_elem(M::Time::minimum()),
70            _marker: PhantomData,
71        }
72    }
73
74    /// Push a container of data into this merge batcher. Updates the internal chain structure if
75    /// needed.
76    fn push_container(&mut self, container: &mut Input) {
77        self.chunker.push_into(container);
78        while let Some(chunk) = self.chunker.extract() {
79            let chunk = std::mem::take(chunk);
80            self.insert_chain(vec![chunk]);
81        }
82    }
83
84    // Sealing a batch means finding those updates with times not greater or equal to any time
85    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
86    // which we call `lower`, by assumption that after sealing a batcher we receive no more
87    // updates with times not greater or equal to `upper`.
88    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
89        // Finish
90        while let Some(chunk) = self.chunker.finish() {
91            let chunk = std::mem::take(chunk);
92            self.insert_chain(vec![chunk]);
93        }
94
95        // Merge all remaining chains into a single chain.
96        while self.chains.len() > 1 {
97            let list1 = self.chain_pop().unwrap();
98            let list2 = self.chain_pop().unwrap();
99            let merged = self.merge_by(list1, list2);
100            self.chain_push(merged);
101        }
102        let merged = self.chain_pop().unwrap_or_default();
103
104        // Extract readied data.
105        let mut kept = Vec::new();
106        let mut readied = Vec::new();
107        self.frontier.clear();
108
109        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
110
111        if !kept.is_empty() {
112            self.chain_push(kept);
113        }
114
115        self.stash.clear();
116
117        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
118        let seal = B::seal(&mut readied, description);
119        self.lower = upper;
120        seal
121    }
122
123    /// The frontier of elements remaining after the most recent call to `self.seal`.
124    #[inline]
125    fn frontier(&mut self) -> AntichainRef<M::Time> {
126        self.frontier.borrow()
127    }
128}
129
130impl<Input, C, M> MergeBatcher<Input, C, M>
131where
132    M: Merger,
133{
134    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
135    /// by decreasing length.
136    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
137        if !chain.is_empty() {
138            self.chain_push(chain);
139            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
140                let list1 = self.chain_pop().unwrap();
141                let list2 = self.chain_pop().unwrap();
142                let merged = self.merge_by(list1, list2);
143                self.chain_push(merged);
144            }
145        }
146    }
147
148    // merges two sorted input lists into one sorted output list.
149    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
150        // TODO: `list1` and `list2` get dropped; would be better to reuse?
151        let mut output = Vec::with_capacity(list1.len() + list2.len());
152        self.merger.merge(list1, list2, &mut output, &mut self.stash);
153
154        output
155    }
156
157    /// Pop a chain and account size changes.
158    #[inline]
159    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
160        let chain = self.chains.pop();
161        self.account(chain.iter().flatten().map(M::account), -1);
162        chain
163    }
164
165    /// Push a chain and account size changes.
166    #[inline]
167    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
168        self.account(chain.iter().map(M::account), 1);
169        self.chains.push(chain);
170    }
171
172    /// Account size changes. Only performs work if a logger exists.
173    ///
174    /// Calculate the size based on the iterator passed along, with each attribute
175    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
176    #[inline]
177    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
178        if let Some(logger) = &self.logger {
179            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
180            for (records_, size_, capacity_, allocations_) in items {
181                records = records.saturating_add_unsigned(records_);
182                size = size.saturating_add_unsigned(size_);
183                capacity = capacity.saturating_add_unsigned(capacity_);
184                allocations = allocations.saturating_add_unsigned(allocations_);
185            }
186            logger.log(BatcherEvent {
187                operator: self.operator_id,
188                records_diff: records * diff,
189                size_diff: size * diff,
190                capacity_diff: capacity * diff,
191                allocations_diff: allocations * diff,
192            })
193        }
194    }
195}
196
197impl<Input, C, M> Drop for MergeBatcher<Input, C, M>
198where
199    M: Merger,
200{
201    fn drop(&mut self) {
202        // Cleanup chain to retract accounting information.
203        while self.chain_pop().is_some() {}
204    }
205}
206
207/// A trait to describe interesting moments in a merge batcher.
208pub trait Merger: Default {
209    /// The internal representation of chunks of data.
210    type Chunk: Container;
211    /// The type of time in frontiers to extract updates.
212    type Time;
213    /// Merge chains into an output chain.
214    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
215    /// Extract ready updates based on the `upper` frontier.
216    fn extract(
217        &mut self,
218        merged: Vec<Self::Chunk>,
219        upper: AntichainRef<Self::Time>,
220        frontier: &mut Antichain<Self::Time>,
221        readied: &mut Vec<Self::Chunk>,
222        kept: &mut Vec<Self::Chunk>,
223        stash: &mut Vec<Self::Chunk>,
224    );
225
226    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
227    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
228}
229
230pub use container::{VecMerger, ColMerger};
231
232pub mod container {
233
234    //! A general purpose `Merger` implementation for arbitrary containers.
235    //!
236    //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
237    //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
238    //! well as the ability to return the container when iteration is complete.
239    //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container
240    //! items should be interpreted with respect to times, and with respect to differences.
241    //! These two traits exist instead of a stack of constraints on the structure of the associated items
242    //! of the containers, allowing them to perform their functions without destructuring their guts.
243    //!
244    //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
245
246    use std::cmp::Ordering;
247    use std::marker::PhantomData;
248    use timely::{Container, container::{PushInto, SizableContainer}};
249    use timely::progress::frontier::{Antichain, AntichainRef};
250    use timely::{Data, PartialOrder};
251
252    use crate::trace::implementations::merge_batcher::Merger;
253
254    /// An abstraction for a container that can be iterated over, and conclude by returning itself.
255    pub trait ContainerQueue<C: Container> {
256        /// Returns either the next item in the container, or the container itself.
257        fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
258        /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`.
259        fn is_empty(&self) -> bool;
260        /// Compare the heads of two queues, where empty queues come last.
261        fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
262        /// Create a new queue from an existing container.
263        fn from(container: C) -> Self;
264    }
265
266    /// Behavior to dissect items of chunks in the merge batcher
267    pub trait MergerChunk : SizableContainer {
268        /// An owned time type.
269        ///
270        /// This type is provided so that users can maintain antichains of something, in order to track
271        /// the forward movement of time and extract intervals from chains of updates.
272        type TimeOwned;
273        /// The owned diff type.
274        ///
275        /// This type is provided so that users can provide an owned instance to the `push_and_add` method,
276        /// to act as a scratch space when the type is substantial and could otherwise require allocations.
277        type DiffOwned: Default;
278
279        /// Relates a borrowed time to antichains of owned times.
280        ///
281        /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
282        fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
283
284        /// Push an entry that adds together two diffs.
285        ///
286        /// This is only called when two items are deemed mergeable by the container queue.
287        /// If the two diffs added together is zero do not push anything.
288        fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
289
290        /// Account the allocations behind the chunk.
291        // TODO: Find a more universal home for this: `Container`?
292        fn account(&self) -> (usize, usize, usize, usize) {
293            let (size, capacity, allocations) = (0, 0, 0);
294            (self.len(), size, capacity, allocations)
295        }
296    }
297
298    /// A merger for arbitrary containers.
299    ///
300    /// `MC` is a [`Container`] that implements [`MergerChunk`].
301    /// `CQ` is a [`ContainerQueue`] supporting `MC`.
302    pub struct ContainerMerger<MC, CQ> {
303        _marker: PhantomData<(MC, CQ)>,
304    }
305
306    impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
307        fn default() -> Self {
308            Self { _marker: PhantomData, }
309        }
310    }
311
312    impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
313        /// Helper to get pre-sized vector from the stash.
314        #[inline]
315        fn empty(&self, stash: &mut Vec<MC>) -> MC {
316            stash.pop().unwrap_or_else(|| {
317                let mut container = MC::default();
318                container.ensure_capacity(&mut None);
319                container
320            })
321        }
322        /// Helper to return a chunk to the stash.
323        #[inline]
324        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
325            // TODO: Should we only retain correctly sized containers?
326            chunk.clear();
327            stash.push(chunk);
328        }
329    }
330
331    impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
332    where
333        for<'a> MC: MergerChunk + Clone + PushInto<<MC as Container>::Item<'a>> + 'static,
334        for<'a> MC::TimeOwned: Ord + PartialOrder + Data,
335        CQ: ContainerQueue<MC>,
336    {
337        type Time = MC::TimeOwned;
338        type Chunk = MC;
339
340        // TODO: Consider integrating with `ConsolidateLayout`.
341        fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
342            let mut list1 = list1.into_iter();
343            let mut list2 = list2.into_iter();
344
345            let mut head1 = CQ::from(list1.next().unwrap_or_default());
346            let mut head2 = CQ::from(list2.next().unwrap_or_default());
347
348            let mut result = self.empty(stash);
349
350            let mut diff_owned = Default::default();
351
352            // while we have valid data in each input, merge.
353            while !head1.is_empty() && !head2.is_empty() {
354                while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
355                    let cmp = head1.cmp_heads(&head2);
356                    // TODO: The following less/greater branches could plausibly be a good moment for
357                    // `copy_range`, on account of runs of records that might benefit more from a
358                    // `memcpy`.
359                    match cmp {
360                        Ordering::Less => {
361                            result.push_into(head1.next_or_alloc().ok().unwrap());
362                        }
363                        Ordering::Greater => {
364                            result.push_into(head2.next_or_alloc().ok().unwrap());
365                        }
366                        Ordering::Equal => {
367                            let item1 = head1.next_or_alloc().ok().unwrap();
368                            let item2 = head2.next_or_alloc().ok().unwrap();
369                            result.push_and_add(item1, item2, &mut diff_owned);
370                       }
371                    }
372                }
373
374                if result.at_capacity() {
375                    output.push_into(result);
376                    result = self.empty(stash);
377                }
378
379                if head1.is_empty() {
380                    self.recycle(head1.next_or_alloc().err().unwrap(), stash);
381                    head1 = CQ::from(list1.next().unwrap_or_default());
382                }
383                if head2.is_empty() {
384                    self.recycle(head2.next_or_alloc().err().unwrap(), stash);
385                    head2 = CQ::from(list2.next().unwrap_or_default());
386                }
387            }
388
389            // TODO: recycle `head1` rather than discarding.
390            while let Ok(next) = head1.next_or_alloc() {
391                result.push_into(next);
392                if result.at_capacity() {
393                    output.push_into(result);
394                    result = self.empty(stash);
395                }
396            }
397            if !result.is_empty() {
398                output.push_into(result);
399                result = self.empty(stash);
400            }
401            output.extend(list1);
402
403            // TODO: recycle `head2` rather than discarding.
404            while let Ok(next) = head2.next_or_alloc() {
405                result.push_into(next);
406                if result.at_capacity() {
407                    output.push(result);
408                    result = self.empty(stash);
409                }
410            }
411            if !result.is_empty() {
412                output.push_into(result);
413                // result = self.empty(stash);
414            }
415            output.extend(list2);
416        }
417
418        fn extract(
419            &mut self,
420            merged: Vec<Self::Chunk>,
421            upper: AntichainRef<Self::Time>,
422            frontier: &mut Antichain<Self::Time>,
423            readied: &mut Vec<Self::Chunk>,
424            kept: &mut Vec<Self::Chunk>,
425            stash: &mut Vec<Self::Chunk>,
426        ) {
427            let mut keep = self.empty(stash);
428            let mut ready = self.empty(stash);
429
430            for mut buffer in merged {
431                for item in buffer.drain() {
432                    if MC::time_kept(&item, &upper, frontier) {
433                        if keep.at_capacity() && !keep.is_empty() {
434                            kept.push(keep);
435                            keep = self.empty(stash);
436                        }
437                        keep.push_into(item);
438                    } else {
439                        if ready.at_capacity() && !ready.is_empty() {
440                            readied.push(ready);
441                            ready = self.empty(stash);
442                        }
443                        ready.push_into(item);
444                    }
445                }
446                // Recycling buffer.
447                self.recycle(buffer, stash);
448            }
449            // Finish the kept data.
450            if !keep.is_empty() {
451                kept.push(keep);
452            }
453            if !ready.is_empty() {
454                readied.push(ready);
455            }
456        }
457
458        /// Account the allocations behind the chunk.
459        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
460            chunk.account()
461        }
462    }
463
464    pub use vec::VecMerger;
465    /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers.
466    pub mod vec {
467
468        use std::collections::VecDeque;
469        use timely::progress::{Antichain, frontier::AntichainRef};
470        use crate::difference::Semigroup;
471        use super::{ContainerQueue, MergerChunk};
472
473        /// A `Merger` implementation backed by vector containers.
474        pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
475
476        impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
477            fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
478                if self.is_empty() {
479                    Err(Vec::from(std::mem::take(self)))
480                }
481                else {
482                    Ok(self.pop_front().unwrap())
483                }
484            }
485            fn is_empty(&self) -> bool {
486                self.is_empty()
487            }
488            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
489                let (data1, time1, _) = self.front().unwrap();
490                let (data2, time2, _) = other.front().unwrap();
491                (data1, time1).cmp(&(data2, time2))
492            }
493            fn from(list: Vec<(D, T, R)>) -> Self {
494                <Self as From<_>>::from(list)
495            }
496        }
497
498        impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
499            type TimeOwned = T;
500            type DiffOwned = ();
501
502            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
503                if upper.less_equal(time) {
504                    frontier.insert_with(&time, |time| time.clone());
505                    true
506                }
507                else { false }
508            }
509            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
510                let (data, time, mut diff1) = item1;
511                let (_data, _time, diff2) = item2;
512                diff1.plus_equals(&diff2);
513                if !diff1.is_zero() {
514                    self.push((data, time, diff1));
515                }
516            }
517            fn account(&self) -> (usize, usize, usize, usize) {
518                let (size, capacity, allocations) = (0, 0, 0);
519                (self.len(), size, capacity, allocations)
520            }
521        }
522    }
523
524    pub use columnation::ColMerger;
525    /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation).
526    pub mod columnation {
527
528        use timely::progress::{Antichain, frontier::AntichainRef};
529        use columnation::Columnation;
530
531        use crate::containers::TimelyStack;
532        use crate::difference::Semigroup;
533
534        use super::{ContainerQueue, MergerChunk};
535
536        /// A `Merger` implementation backed by `TimelyStack` containers (columnation).
537        pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
538
539        /// TODO
540        pub struct TimelyStackQueue<T: Columnation> {
541            list: TimelyStack<T>,
542            head: usize,
543        }
544
545        impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
546            fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
547                if self.is_empty() {
548                    Err(std::mem::take(&mut self.list))
549                }
550                else {
551                    Ok(self.pop())
552                }
553            }
554            fn is_empty(&self) -> bool {
555                self.head == self.list[..].len()
556            }
557            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
558                let (data1, time1, _) = self.peek();
559                let (data2, time2, _) = other.peek();
560                (data1, time1).cmp(&(data2, time2))
561            }
562            fn from(list: TimelyStack<(D, T, R)>) -> Self {
563                TimelyStackQueue { list, head: 0 }
564            }
565        }
566
567        impl<T: Columnation> TimelyStackQueue<T> {
568            fn pop(&mut self) -> &T {
569                self.head += 1;
570                &self.list[self.head - 1]
571            }
572
573            fn peek(&self) -> &T {
574                &self.list[self.head]
575            }
576        }
577
578        impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
579            type TimeOwned = T;
580            type DiffOwned = R;
581
582            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
583                if upper.less_equal(time) {
584                    frontier.insert_with(&time, |time| time.clone());
585                    true
586                }
587                else { false }
588            }
589            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
590                let (data, time, diff1) = item1;
591                let (_data, _time, diff2) = item2;
592                stash.clone_from(diff1);
593                stash.plus_equals(&diff2);
594                if !stash.is_zero() {
595                    self.copy_destructured(data, time, stash);
596                }
597            }
598            fn account(&self) -> (usize, usize, usize, usize) {
599                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
600                let cb = |siz, cap| {
601                    size += siz;
602                    capacity += cap;
603                    allocations += 1;
604                };
605                self.heap_size(cb);
606                (self.len(), size, capacity, allocations)
607            }
608        }
609    }
610}