differential_dataflow/operators/
reduce.rs

1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use timely::Container;
9use timely::container::PushInto;
10use crate::hashable::Hashable;
11use crate::{Data, ExchangeData, Collection};
12use crate::difference::{Semigroup, Abelian};
13
14use timely::order::PartialOrder;
15use timely::progress::frontier::Antichain;
16use timely::progress::Timestamp;
17use timely::dataflow::*;
18use timely::dataflow::operators::Operator;
19use timely::dataflow::channels::pact::Pipeline;
20use timely::dataflow::operators::Capability;
21
22use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
23use crate::lattice::Lattice;
24use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
25use crate::trace::cursor::CursorList;
26use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder};
27use crate::trace::implementations::containers::BatchContainer;
28
29use crate::trace::TraceReader;
30
31/// Extension trait for the `reduce` differential dataflow method.
32pub trait Reduce<G: Scope<Timestamp: Lattice+Ord>, K: Data, V: Data, R: Semigroup> {
33    /// Applies a reduction function on records grouped by key.
34    ///
35    /// Input data must be structured as `(key, val)` pairs.
36    /// The user-supplied reduction function takes as arguments
37    ///
38    /// 1. a reference to the key,
39    /// 2. a reference to the slice of values and their accumulated updates,
40    /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
41    ///
42    /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
43    /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
44    /// `Ord` implementations.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// use differential_dataflow::input::Input;
50    /// use differential_dataflow::operators::Reduce;
51    ///
52    /// ::timely::example(|scope| {
53    ///     // report the smallest value for each group
54    ///     scope.new_collection_from(1 .. 10).1
55    ///          .map(|x| (x / 3, x))
56    ///          .reduce(|_key, input, output| {
57    ///              output.push((*input[0].0, 1))
58    ///          });
59    /// });
60    /// ```
61    fn reduce<L, V2: Data, R2: Ord+Abelian+'static>(&self, logic: L) -> Collection<G, (K, V2), R2>
62    where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
63        self.reduce_named("Reduce", logic)
64    }
65
66    /// As `reduce` with the ability to name the operator.
67    fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
68    where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static;
69}
70
71impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>
72    where
73        G: Scope<Timestamp: Lattice+Ord>,
74        K: ExchangeData+Hashable,
75        V: ExchangeData,
76        R: ExchangeData+Semigroup,
77 {
78    fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
79        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
80        self.arrange_by_key_named(&format!("Arrange: {}", name))
81            .reduce_named(name, logic)
82    }
83}
84
85impl<G, K: Data, V: Data, T1, R: Ord+Semigroup+'static> Reduce<G, K, V, R> for Arranged<G, T1>
86where
87    G: Scope<Timestamp=T1::Time>,
88    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static,
89{
90    fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
91        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
92        self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
93            .as_collection(|k,v| (k.clone(), v.clone()))
94    }
95}
96
97/// Extension trait for the `threshold` and `distinct` differential dataflow methods.
98pub trait Threshold<G: Scope<Timestamp: Lattice+Ord>, K: Data, R1: Semigroup> {
99    /// Transforms the multiplicity of records.
100    ///
101    /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
102    /// least the computation may behave as if it does. Otherwise, the transformation
103    /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use differential_dataflow::input::Input;
109    /// use differential_dataflow::operators::Threshold;
110    ///
111    /// ::timely::example(|scope| {
112    ///     // report at most one of each key.
113    ///     scope.new_collection_from(1 .. 10).1
114    ///          .map(|x| x / 3)
115    ///          .threshold(|_,c| c % 2);
116    /// });
117    /// ```
118    fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
119        self.threshold_named("Threshold", thresh)
120    }
121
122    /// A `threshold` with the ability to name the operator.
123    fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;
124
125    /// Reduces the collection to one occurrence of each distinct element.
126    ///
127    /// # Examples
128    ///
129    /// ```
130    /// use differential_dataflow::input::Input;
131    /// use differential_dataflow::operators::Threshold;
132    ///
133    /// ::timely::example(|scope| {
134    ///     // report at most one of each key.
135    ///     scope.new_collection_from(1 .. 10).1
136    ///          .map(|x| x / 3)
137    ///          .distinct();
138    /// });
139    /// ```
140    fn distinct(&self) -> Collection<G, K, isize> {
141        self.distinct_core()
142    }
143
144    /// Distinct for general integer differences.
145    ///
146    /// This method allows `distinct` to produce collections whose difference
147    /// type is something other than an `isize` integer, for example perhaps an
148    /// `i32`.
149    fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(&self) -> Collection<G, K, R2> {
150        self.threshold_named("Distinct", |_,_| R2::from(1i8))
151    }
152}
153
154impl<G: Scope<Timestamp: Lattice+Ord>, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1> {
155    fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
156        self.arrange_by_self_named(&format!("Arrange: {}", name))
157            .threshold_named(name, thresh)
158    }
159}
160
161impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
162where
163    G: Scope<Timestamp=T1::Time>,
164    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static,
165{
166    fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
167        self.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
168            .as_collection(|k,_| k.clone())
169    }
170}
171
172/// Extension trait for the `count` differential dataflow method.
173pub trait Count<G: Scope<Timestamp: Lattice+Ord>, K: Data, R: Semigroup> {
174    /// Counts the number of occurrences of each element.
175    ///
176    /// # Examples
177    ///
178    /// ```
179    /// use differential_dataflow::input::Input;
180    /// use differential_dataflow::operators::Count;
181    ///
182    /// ::timely::example(|scope| {
183    ///     // report the number of occurrences of each key
184    ///     scope.new_collection_from(1 .. 10).1
185    ///          .map(|x| x / 3)
186    ///          .count();
187    /// });
188    /// ```
189    fn count(&self) -> Collection<G, (K, R), isize> {
190        self.count_core()
191    }
192
193    /// Count for general integer differences.
194    ///
195    /// This method allows `count` to produce collections whose difference
196    /// type is something other than an `isize` integer, for example perhaps an
197    /// `i32`.
198    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2>;
199}
200
201impl<G: Scope<Timestamp: Lattice+Ord>, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R> {
202    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
203        self.arrange_by_self_named("Arrange: Count")
204            .count_core()
205    }
206}
207
208impl<G, K: Data, T1, R: Data+Semigroup> Count<G, K, R> for Arranged<G, T1>
209where
210    G: Scope<Timestamp=T1::Time>,
211    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static,
212{
213    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
214        self.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
215            .as_collection(|k,c| (k.clone(), c.clone()))
216    }
217}
218
219/// Extension trait for the `reduce_core` differential dataflow method.
220pub trait ReduceCore<G: Scope<Timestamp: Lattice+Ord>, K: ToOwned + ?Sized, V: Data, R: Semigroup> {
221    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
222    ///
223    /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
224    /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use differential_dataflow::input::Input;
230    /// use differential_dataflow::operators::reduce::ReduceCore;
231    /// use differential_dataflow::trace::Trace;
232    /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
233    ///
234    /// ::timely::example(|scope| {
235    ///
236    ///     let trace =
237    ///     scope.new_collection_from(1 .. 10u32).1
238    ///          .map(|x| (x, x))
239    ///          .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
240    ///             "Example",
241    ///              move |_key, src, dst| dst.push((*src[0].0, 1))
242    ///          )
243    ///          .trace;
244    /// });
245    /// ```
246    fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
247        where
248            T2: for<'a> Trace<
249                Key<'a>= &'a K,
250                KeyOwn = K,
251                ValOwn = V,
252                Time=G::Timestamp,
253                Diff: Abelian,
254            >+'static,
255            Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
256            L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
257        {
258            self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
259                if !input.is_empty() {
260                    logic(key, input, change);
261                }
262                change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
263                crate::consolidation::consolidate(change);
264            })
265        }
266
267    /// Solves for output updates when presented with inputs and would-be outputs.
268    ///
269    /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
270    /// and it may not be safe to index into the first element.
271    /// At least one of the two collections will be non-empty.
272    fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
273        where
274            T2: for<'a> Trace<
275                Key<'a>=&'a K,
276                KeyOwn = K,
277                ValOwn = V,
278                Time=G::Timestamp,
279            >+'static,
280            Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
281            L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
282            ;
283}
284
285impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>
286where
287    G: Scope,
288    G::Timestamp: Lattice+Ord,
289    K: ExchangeData+Hashable,
290    V: ExchangeData,
291    R: ExchangeData+Semigroup,
292{
293    fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
294        where
295            V: Data,
296            T2: for<'a> Trace<
297                Key<'a>=&'a K,
298                KeyOwn = K,
299                ValOwn = V,
300                Time=G::Timestamp,
301            >+'static,
302            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
303            L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
304    {
305        self.arrange_by_key_named(&format!("Arrange: {}", name))
306            .reduce_core::<_,Bu,_>(name, logic)
307    }
308}
309
310/// A key-wise reduction of values in an input trace.
311///
312/// This method exists to provide reduce functionality without opinions about qualifying trace types.
313pub fn reduce_trace<G, T1, Bu, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
314where
315    G: Scope<Timestamp=T1::Time>,
316    T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
317    T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
318    Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319    L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320{
321    let mut result_trace = None;
322
323    // fabricate a data-parallel operator using the `unary_notify` pattern.
324    let stream = {
325
326        let result_trace = &mut result_trace;
327        trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
328
329            // Acquire a logger for arrange events.
330            let logger = trace.stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
331
332            let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
333            let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
334            // If there is default exert logic set, install it.
335            if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
336                empty.set_exert_logic(exert_logic);
337            }
338
339
340            let mut source_trace = trace.trace.clone();
341
342            let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
343
344            // let mut output_trace = TraceRc::make_from(agent).0;
345            *result_trace = Some(output_reader.clone());
346
347            // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
348            // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
349            let mut new_interesting_times = Vec::<G::Timestamp>::new();
350
351            // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
352            // as well as capabilities for these times (or their lower envelope, at least).
353            let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
354            let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
355
356            // buffers and logic for computing per-key interesting times "efficiently".
357            let mut interesting_times = Vec::<G::Timestamp>::new();
358
359            // Upper and lower frontiers for the pending input and output batches to process.
360            let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
361            let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
362
363            // Output batches may need to be built piecemeal, and these temp storage help there.
364            let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
365            let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
366
367            let id = trace.stream.scope().index();
368
369            move |input, output| {
370
371                // The `reduce` operator receives fully formed batches, which each serve as an indication
372                // that the frontier has advanced to the upper bound of their description.
373                //
374                // Although we could act on each individually, several may have been sent, and it makes
375                // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
376                // attention to which times need to be collected under which capability, so that we can
377                // assemble output batches correctly. We will maintain several builders concurrently, and
378                // place output updates into the appropriate builder.
379                //
380                // It turns out we must use notificators, as we cannot await empty batches from arrange to
381                // indicate progress, as the arrange may not hold the capability to send such. Instead, we
382                // must watch for progress here (and the upper bound of received batches) to tell us how
383                // far we can process work.
384                //
385                // We really want to retire all batches we receive, so we want a frontier which reflects
386                // both information from batches as well as progress information. I think this means that
387                // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
388
389                let mut batch_cursors = Vec::new();
390                let mut batch_storage = Vec::new();
391
392                // Downgrade previous upper limit to be current lower limit.
393                lower_limit.clear();
394                lower_limit.extend(upper_limit.borrow().iter().cloned());
395
396                // Drain the input stream of batches, validating the contiguity of the batch descriptions and
397                // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
398                // times in the batch.
399                input.for_each(|capability, batches| {
400
401                    for batch in batches.drain(..) {
402                        upper_limit.clone_from(batch.upper());
403                        batch_cursors.push(batch.cursor());
404                        batch_storage.push(batch);
405                    }
406
407                    // Ensure that `capabilities` covers the capability of the batch.
408                    capabilities.retain(|cap| !capability.time().less_than(cap.time()));
409                    if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
410                        capabilities.push(capability.retain());
411                    }
412                });
413
414                // Pull in any subsequent empty batches we believe to exist.
415                source_trace.advance_upper(&mut upper_limit);
416
417                // Only if our upper limit has advanced should we do work.
418                if upper_limit != lower_limit {
419
420                    // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
421                    // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
422                    // to indicate forward progress, and must hope that downstream operators look at progress frontiers
423                    // as well as batch descriptions.
424                    //
425                    // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
426                    if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
427
428                        // `interesting` contains "warnings" about keys and times that may need to be re-considered.
429                        // We first extract those times from this list that lie in the interval we will process.
430                        sort_dedup(&mut interesting);
431                        // `exposed` contains interesting (key, time)s now below `upper_limit`
432                        let mut exposed_keys = T1::KeyContainer::with_capacity(0);
433                        let mut exposed_time = T1::TimeContainer::with_capacity(0);
434                        // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
435                        interesting.retain(|(key, time)| {
436                            if upper_limit.less_equal(time) { true } else {
437                                exposed_keys.push_own(key);
438                                exposed_time.push_own(time);
439                                false
440                            }
441                        });
442
443                        // Prepare an output buffer and builder for each capability.
444                        //
445                        // We buffer and build separately, as outputs are produced grouped by time, whereas the
446                        // builder wants to see outputs grouped by value. While the per-key computation could
447                        // do the re-sorting itself, buffering per-key outputs lets us double check the results
448                        // against other implementations for accuracy.
449                        //
450                        // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
451                        //       this as long as it requires that there is only one capability for each message.
452                        let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
453                        let mut builders = Vec::new();
454                        for cap in capabilities.iter() {
455                            buffers.push((cap.time().clone(), Vec::new()));
456                            builders.push(Bu::new());
457                        }
458
459                        let mut buffer = Bu::Input::default();
460
461                        // cursors for navigating input and output traces.
462                        let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
463                        let source_storage = &source_storage;
464                        let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
465                        let output_storage = &output_storage;
466                        let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
467                        let batch_storage = &batch_storage;
468
469                        let mut thinker = history_replay::HistoryReplayer::new();
470
471                        // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
472                        //
473                        // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
474                        // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
475                        // There could perhaps be a less provocative variable name.
476                        let mut exposed_position = 0;
477                        while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
478
479                            // Determine the next key we will work on; could be synthetic, could be from a batch.
480                            let key1 = exposed_keys.get(exposed_position);
481                            let key2 = batch_cursor.get_key(batch_storage);
482                            let key = match (key1, key2) {
483                                (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
484                                (Some(key1), None)       => key1,
485                                (None, Some(key2))       => key2,
486                                (None, None)             => unreachable!(),
487                            };
488
489                            // `interesting_times` contains those times between `lower_issued` and `upper_limit`
490                            // that we need to re-consider. We now populate it, but perhaps this should be left
491                            // to the per-key computation, which may be able to avoid examining the times of some
492                            // values (for example, in the case of min/max/topk).
493                            interesting_times.clear();
494
495                            // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
496                            while exposed_keys.get(exposed_position) == Some(key) {
497                                interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
498                                exposed_position += 1;
499                            }
500
501                            // tidy up times, removing redundancy.
502                            sort_dedup(&mut interesting_times);
503
504                            // do the per-key computation.
505                            let _counters = thinker.compute(
506                                key,
507                                (&mut source_cursor, source_storage),
508                                (&mut output_cursor, output_storage),
509                                (&mut batch_cursor, batch_storage),
510                                &mut interesting_times,
511                                &mut logic,
512                                &upper_limit,
513                                &mut buffers[..],
514                                &mut new_interesting_times,
515                            );
516
517                            if batch_cursor.get_key(batch_storage) == Some(key) {
518                                batch_cursor.step_key(batch_storage);
519                            }
520
521                            // Record future warnings about interesting times (and assert they should be "future").
522                            for time in new_interesting_times.drain(..) {
523                                debug_assert!(upper_limit.less_equal(&time));
524                                interesting.push((T1::owned_key(key), time));
525                            }
526
527                            // Sort each buffer by value and move into the corresponding builder.
528                            // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
529                            //       (ii) that the buffers are time-ordered, and (iii) that the builders accept
530                            //       arbitrarily ordered times.
531                            for index in 0 .. buffers.len() {
532                                buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
533                                for (val, time, diff) in buffers[index].1.drain(..) {
534                                    buffer.push_into(((T1::owned_key(key), val), time, diff));
535                                    builders[index].push(&mut buffer);
536                                    buffer.clear();
537                                }
538                            }
539                        }
540
541                        // We start sealing output batches from the lower limit (previous upper limit).
542                        // In principle, we could update `lower_limit` itself, and it should arrive at
543                        // `upper_limit` by the end of the process.
544                        output_lower.clear();
545                        output_lower.extend(lower_limit.borrow().iter().cloned());
546
547                        // build and ship each batch (because only one capability per message).
548                        for (index, builder) in builders.drain(..).enumerate() {
549
550                            // Form the upper limit of the next batch, which includes all times greater
551                            // than the input batch, or the capabilities from i + 1 onward.
552                            output_upper.clear();
553                            output_upper.extend(upper_limit.borrow().iter().cloned());
554                            for capability in &capabilities[index + 1 ..] {
555                                output_upper.insert(capability.time().clone());
556                            }
557
558                            if output_upper.borrow() != output_lower.borrow() {
559
560                                let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
561                                let batch = builder.done(description);
562
563                                // ship batch to the output, and commit to the output trace.
564                                output.session(&capabilities[index]).give(batch.clone());
565                                output_writer.insert(batch, Some(capabilities[index].time().clone()));
566
567                                output_lower.clear();
568                                output_lower.extend(output_upper.borrow().iter().cloned());
569                            }
570                        }
571
572                        // This should be true, as the final iteration introduces no capabilities, and
573                        // uses exactly `upper_limit` to determine the upper bound. Good to check though.
574                        assert!(output_upper.borrow() == upper_limit.borrow());
575
576                        // Determine the frontier of our interesting times.
577                        let mut frontier = Antichain::<G::Timestamp>::new();
578                        for (_, time) in &interesting {
579                            frontier.insert_ref(time);
580                        }
581
582                        // Update `capabilities` to reflect interesting pairs described by `frontier`.
583                        let mut new_capabilities = Vec::new();
584                        for time in frontier.borrow().iter() {
585                            if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
586                                new_capabilities.push(cap.delayed(time));
587                            }
588                            else {
589                                println!("{}:\tfailed to find capability less than new frontier time:", id);
590                                println!("{}:\t  time: {:?}", id, time);
591                                println!("{}:\t  caps: {:?}", id, capabilities);
592                                println!("{}:\t  uppr: {:?}", id, upper_limit);
593                            }
594                        }
595                        capabilities = new_capabilities;
596
597                        // ensure that observed progress is reflected in the output.
598                        output_writer.seal(upper_limit.clone());
599                    }
600                    else {
601                        output_writer.seal(upper_limit.clone());
602                    }
603
604                    // We only anticipate future times in advance of `upper_limit`.
605                    source_trace.set_logical_compaction(upper_limit.borrow());
606                    output_reader.set_logical_compaction(upper_limit.borrow());
607
608                    // We will only slice the data between future batches.
609                    source_trace.set_physical_compaction(upper_limit.borrow());
610                    output_reader.set_physical_compaction(upper_limit.borrow());
611                }
612
613                // Exert trace maintenance if we have been so requested.
614                output_writer.exert();
615            }
616        }
617    )
618    };
619
620    Arranged { stream, trace: result_trace.unwrap() }
621}
622
623
624#[inline(never)]
625fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
626    list.dedup();
627    list.sort();
628    list.dedup();
629}
630
631trait PerKeyCompute<'a, C1, C2, C3, V>
632where
633    C1: Cursor,
634    C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
635    C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
636    V: Clone + Ord,
637{
638    fn new() -> Self;
639    fn compute<L>(
640        &mut self,
641        key: C1::Key<'a>,
642        source_cursor: (&mut C1, &'a C1::Storage),
643        output_cursor: (&mut C2, &'a C2::Storage),
644        batch_cursor: (&mut C3, &'a C3::Storage),
645        times: &mut Vec<C1::Time>,
646        logic: &mut L,
647        upper_limit: &Antichain<C1::Time>,
648        outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
649        new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
650    where
651        L: FnMut(
652            C1::Key<'a>,
653            &[(C1::Val<'a>, C1::Diff)],
654            &mut Vec<(V, C2::Diff)>,
655            &mut Vec<(V, C2::Diff)>,
656        );
657}
658
659
660/// Implementation based on replaying historical and new updates together.
661mod history_replay {
662
663    use timely::progress::Antichain;
664    use timely::PartialOrder;
665
666    use crate::lattice::Lattice;
667    use crate::trace::Cursor;
668    use crate::operators::ValueHistory;
669
670    use super::{PerKeyCompute, sort_dedup};
671
672    /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
673    /// time order, maintaining consolidated representations of updates with respect to future interesting times.
674    pub struct HistoryReplayer<'a, C1, C2, C3, V>
675    where
676        C1: Cursor,
677        C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
678        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
679        V: Clone + Ord,
680    {
681        input_history: ValueHistory<'a, C1>,
682        output_history: ValueHistory<'a, C2>,
683        batch_history: ValueHistory<'a, C3>,
684        input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
685        output_buffer: Vec<(V, C2::Diff)>,
686        update_buffer: Vec<(V, C2::Diff)>,
687        output_produced: Vec<((V, C2::Time), C2::Diff)>,
688        synth_times: Vec<C1::Time>,
689        meets: Vec<C1::Time>,
690        times_current: Vec<C1::Time>,
691        temporary: Vec<C1::Time>,
692    }
693
694    impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V>
695    where
696        C1: Cursor,
697        C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
698        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
699        V: Clone + Ord,
700    {
701        fn new() -> Self {
702            HistoryReplayer {
703                input_history: ValueHistory::new(),
704                output_history: ValueHistory::new(),
705                batch_history: ValueHistory::new(),
706                input_buffer: Vec::new(),
707                output_buffer: Vec::new(),
708                update_buffer: Vec::new(),
709                output_produced: Vec::new(),
710                synth_times: Vec::new(),
711                meets: Vec::new(),
712                times_current: Vec::new(),
713                temporary: Vec::new(),
714            }
715        }
716        #[inline(never)]
717        fn compute<L>(
718            &mut self,
719            key: C1::Key<'a>,
720            (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
721            (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
722            (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
723            times: &mut Vec<C1::Time>,
724            logic: &mut L,
725            upper_limit: &Antichain<C1::Time>,
726            outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
727            new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
728        where
729            L: FnMut(
730                C1::Key<'a>,
731                &[(C1::Val<'a>, C1::Diff)],
732                &mut Vec<(V, C2::Diff)>,
733                &mut Vec<(V, C2::Diff)>,
734            )
735        {
736
737            // The work we need to perform is at times defined principally by the contents of `batch_cursor`
738            // and `times`, respectively "new work we just received" and "old times we were warned about".
739            //
740            // Our first step is to identify these times, so that we can use them to restrict the amount of
741            // information we need to recover from `input` and `output`; as all times of interest will have
742            // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
743            // loaded times by performing the lattice `join` with this value.
744
745            // Load the batch contents.
746            let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
747
748            // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
749            // can be used to advance other historical times, which may consolidate their representation. As
750            // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
751            // history forward.
752
753            self.meets.clear();
754            self.meets.extend(times.iter().cloned());
755            for index in (1 .. self.meets.len()).rev() {
756                self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
757            }
758
759            // Determine the meet of times in `batch` and `times`.
760            let mut meet = None;
761            update_meet(&mut meet, self.meets.get(0));
762            update_meet(&mut meet, batch_replay.meet());
763            // if let Some(time) = self.meets.get(0) {
764            //     meet = match meet {
765            //         None => Some(self.meets[0].clone()),
766            //         Some(x) => Some(x.meet(&self.meets[0])),
767            //     };
768            // }
769            // if let Some(time) = batch_replay.meet() {
770            //     meet = match meet {
771            //         None => Some(time.clone()),
772            //         Some(x) => Some(x.meet(&time)),
773            //     };
774            // }
775
776            // Having determined the meet, we can load the input and output histories, where we
777            // advance all times by joining them with `meet`. The resulting times are more compact
778            // and guaranteed to accumulate identically for times greater or equal to `meet`.
779
780            // Load the input and output histories.
781            let mut input_replay = if let Some(meet) = meet.as_ref() {
782                self.input_history.replay_key(source_cursor, source_storage, key, |time| {
783                    let mut time = C1::owned_time(time);
784                    time.join_assign(meet);
785                    time
786                })
787            }
788            else {
789                self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time))
790            };
791            let mut output_replay = if let Some(meet) = meet.as_ref() {
792                self.output_history.replay_key(output_cursor, output_storage, key, |time| {
793                    let mut time = C2::owned_time(time);
794                    time.join_assign(meet);
795                    time
796                })
797            }
798            else {
799                self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time))
800            };
801
802            self.synth_times.clear();
803            self.times_current.clear();
804            self.output_produced.clear();
805
806            // The frontier of times we may still consider.
807            // Derived from frontiers of our update histories, supplied times, and synthetic times.
808
809            let mut times_slice = &times[..];
810            let mut meets_slice = &self.meets[..];
811
812            let mut compute_counter = 0;
813            let mut output_counter = 0;
814
815            // We have candidate times from `batch` and `times`, as well as times identified by either
816            // `input` or `output`. Finally, we may have synthetic times produced as the join of times
817            // we consider in the course of evaluation. As long as any of these times exist, we need to
818            // keep examining times.
819            while let Some(next_time) = [   batch_replay.time(),
820                                            times_slice.first(),
821                                            input_replay.time(),
822                                            output_replay.time(),
823                                            self.synth_times.last(),
824                                        ].iter().cloned().flatten().min().cloned() {
825
826                // Advance input and output history replayers. This marks applicable updates as active.
827                input_replay.step_while_time_is(&next_time);
828                output_replay.step_while_time_is(&next_time);
829
830                // One of our goals is to determine if `next_time` is "interesting", meaning whether we
831                // have any evidence that we should re-evaluate the user logic at this time. For a time
832                // to be "interesting" it would need to be the join of times that include either a time
833                // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
834
835                // Advance batch history, and capture whether an update exists at `next_time`.
836                let mut interesting = batch_replay.step_while_time_is(&next_time);
837                if interesting {
838                    if let Some(meet) = meet.as_ref() {
839                        batch_replay.advance_buffer_by(meet);
840                    }
841                }
842
843                // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
844                while self.synth_times.last() == Some(&next_time) {
845                    // We don't know enough about `next_time` to avoid putting it in to `times_current`.
846                    // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
847                    self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
848                    interesting = true;
849                }
850                while times_slice.first() == Some(&next_time) {
851                    // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
852                    self.times_current.push(times_slice[0].clone());
853                    times_slice = &times_slice[1..];
854                    meets_slice = &meets_slice[1..];
855                    interesting = true;
856                }
857
858                // Times could also be interesting if an interesting time is less than them, as they would join
859                // and become the time itself. They may not equal the current time because whatever frontier we
860                // are tracking may not have advanced far enough.
861                // TODO: `batch_history` may or may not be super compact at this point, and so this check might
862                //       yield false positives if not sufficiently compact. Maybe we should into this and see.
863                interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
864                interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
865
866                // We should only process times that are not in advance of `upper_limit`.
867                //
868                // We have no particular guarantee that known times will not be in advance of `upper_limit`.
869                // We may have the guarantee that synthetic times will not be, as we test against the limit
870                // before we add the time to `synth_times`.
871                if !upper_limit.less_equal(&next_time) {
872
873                    // We should re-evaluate the computation if this is an interesting time.
874                    // If the time is uninteresting (and our logic is sound) it is not possible for there to be
875                    // output produced. This sounds like a good test to have for debug builds!
876                    if interesting {
877
878                        compute_counter += 1;
879
880                        // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
881                        debug_assert!(self.input_buffer.is_empty());
882                        meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet));
883                        for &((value, ref time), ref diff) in input_replay.buffer().iter() {
884                            if time.less_equal(&next_time) {
885                                self.input_buffer.push((value, diff.clone()));
886                            }
887                            else {
888                                self.temporary.push(next_time.join(time));
889                            }
890                        }
891                        for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
892                            if time.less_equal(&next_time) {
893                                self.input_buffer.push((value, diff.clone()));
894                            }
895                            else {
896                                self.temporary.push(next_time.join(time));
897                            }
898                        }
899                        crate::consolidation::consolidate(&mut self.input_buffer);
900
901                        meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet));
902                        for &((value, ref time), ref diff) in output_replay.buffer().iter() {
903                            if time.less_equal(&next_time) {
904                                self.output_buffer.push((C2::owned_val(value), diff.clone()));
905                            }
906                            else {
907                                self.temporary.push(next_time.join(time));
908                            }
909                        }
910                        for &((ref value, ref time), ref diff) in self.output_produced.iter() {
911                            if time.less_equal(&next_time) {
912                                self.output_buffer.push(((*value).to_owned(), diff.clone()));
913                            }
914                            else {
915                                self.temporary.push(next_time.join(time));
916                            }
917                        }
918                        crate::consolidation::consolidate(&mut self.output_buffer);
919
920                        // Apply user logic if non-empty input and see what happens!
921                        if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
922                            logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
923                            self.input_buffer.clear();
924                            self.output_buffer.clear();
925                        }
926
927                        // output_replay.advance_buffer_by(&meet);
928                        // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
929                        //     if time.less_equal(&next_time) {
930                        //         self.output_buffer.push(((*value).clone(), -diff));
931                        //     }
932                        //     else {
933                        //         self.temporary.push(next_time.join(time));
934                        //     }
935                        // }
936                        // for &((ref value, ref time), diff) in self.output_produced.iter() {
937                        //     if time.less_equal(&next_time) {
938                        //         self.output_buffer.push(((*value).clone(), -diff));
939                        //     }
940                        //     else {
941                        //         self.temporary.push(next_time.join(&time));
942                        //     }
943                        // }
944
945                        // Having subtracted output updates from user output, consolidate the results to determine
946                        // if there is anything worth reporting. Note: this also orders the results by value, so
947                        // that could make the above merging plan even easier.
948                        crate::consolidation::consolidate(&mut self.update_buffer);
949
950                        // Stash produced updates into both capability-indexed buffers and `output_produced`.
951                        // The two locations are important, in that we will compact `output_produced` as we move
952                        // through times, but we cannot compact the output buffers because we need their actual
953                        // times.
954                        if !self.update_buffer.is_empty() {
955
956                            output_counter += 1;
957
958                            // We *should* be able to find a capability for `next_time`. Any thing else would
959                            // indicate a logical error somewhere along the way; either we release a capability
960                            // we should have kept, or we have computed the output incorrectly (or both!)
961                            let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
962                            let idx = outputs.len() - idx.expect("failed to find index") - 1;
963                            for (val, diff) in self.update_buffer.drain(..) {
964                                self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
965                                outputs[idx].1.push((val, next_time.clone(), diff));
966                            }
967
968                            // Advance times in `self.output_produced` and consolidate the representation.
969                            // NOTE: We only do this when we add records; it could be that there are situations
970                            //       where we want to consolidate even without changes (because an initially
971                            //       large collection can now be collapsed).
972                            if let Some(meet) = meet.as_ref() {
973                                for entry in &mut self.output_produced {
974                                    (entry.0).1 = (entry.0).1.join(meet);
975                                }
976                            }
977                            crate::consolidation::consolidate(&mut self.output_produced);
978                        }
979                    }
980
981                    // Determine synthetic interesting times.
982                    //
983                    // Synthetic interesting times are produced differently for interesting and uninteresting
984                    // times. An uninteresting time must join with an interesting time to become interesting,
985                    // which means joins with `self.batch_history` and  `self.times_current`. I think we can
986                    // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
987                    // joined against everything.
988
989                    // Any time, even uninteresting times, must be joined with the current accumulation of
990                    // batch times as well as the current accumulation of `times_current`.
991                    for &((_, ref time), _) in batch_replay.buffer().iter() {
992                        if !time.less_equal(&next_time) {
993                            self.temporary.push(time.join(&next_time));
994                        }
995                    }
996                    for time in self.times_current.iter() {
997                        if !time.less_equal(&next_time) {
998                            self.temporary.push(time.join(&next_time));
999                        }
1000                    }
1001
1002                    sort_dedup(&mut self.temporary);
1003
1004                    // Introduce synthetic times, and re-organize if we add any.
1005                    let synth_len = self.synth_times.len();
1006                    for time in self.temporary.drain(..) {
1007                        // We can either service `join` now, or must delay for the future.
1008                        if upper_limit.less_equal(&time) {
1009                            debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
1010                            new_interesting.push(time);
1011                        }
1012                        else {
1013                            self.synth_times.push(time);
1014                        }
1015                    }
1016                    if self.synth_times.len() > synth_len {
1017                        self.synth_times.sort_by(|x,y| y.cmp(x));
1018                        self.synth_times.dedup();
1019                    }
1020                }
1021                else if interesting {
1022                    // We cannot process `next_time` now, and must delay it.
1023                    //
1024                    // I think we are probably only here because of an uninteresting time declared interesting,
1025                    // as initial interesting times are filtered to be in interval, and synthetic times are also
1026                    // filtered before introducing them to `self.synth_times`.
1027                    new_interesting.push(next_time.clone());
1028                    debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
1029                }
1030
1031
1032                // Update `meet` to track the meet of each source of times.
1033                meet = None;//T::maximum();
1034                update_meet(&mut meet, batch_replay.meet());
1035                update_meet(&mut meet, input_replay.meet());
1036                update_meet(&mut meet, output_replay.meet());
1037                for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
1038                // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
1039                // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
1040                // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
1041                // for time in self.synth_times.iter() { meet = meet.meet(time); }
1042                update_meet(&mut meet, meets_slice.first());
1043                // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
1044
1045                // Update `times_current` by the frontier.
1046                if let Some(meet) = meet.as_ref() {
1047                    for time in self.times_current.iter_mut() {
1048                        *time = time.join(meet);
1049                    }
1050                }
1051
1052                sort_dedup(&mut self.times_current);
1053            }
1054
1055            // Normalize the representation of `new_interesting`, deduplicating and ordering.
1056            sort_dedup(new_interesting);
1057
1058            (compute_counter, output_counter)
1059        }
1060    }
1061
1062    /// Updates an optional meet by an optional time.
1063    fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
1064        if let Some(time) = other {
1065            if let Some(meet) = meet.as_mut() {
1066                *meet = meet.meet(time);
1067            }
1068            if meet.is_none() {
1069                *meet = Some(time.clone());
1070            }
1071        }
1072    }
1073}