differential_dataflow/operators/reduce.rs
1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use timely::Container;
9use timely::container::PushInto;
10use crate::hashable::Hashable;
11use crate::{Data, ExchangeData, Collection};
12use crate::difference::{Semigroup, Abelian};
13
14use timely::order::PartialOrder;
15use timely::progress::frontier::Antichain;
16use timely::progress::Timestamp;
17use timely::dataflow::*;
18use timely::dataflow::operators::Operator;
19use timely::dataflow::channels::pact::Pipeline;
20use timely::dataflow::operators::Capability;
21
22use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
23use crate::lattice::Lattice;
24use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
25use crate::trace::cursor::CursorList;
26use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder};
27use crate::trace::implementations::containers::BatchContainer;
28
29use crate::trace::TraceReader;
30
31/// Extension trait for the `reduce` differential dataflow method.
32pub trait Reduce<G: Scope<Timestamp: Lattice+Ord>, K: Data, V: Data, R: Semigroup> {
33 /// Applies a reduction function on records grouped by key.
34 ///
35 /// Input data must be structured as `(key, val)` pairs.
36 /// The user-supplied reduction function takes as arguments
37 ///
38 /// 1. a reference to the key,
39 /// 2. a reference to the slice of values and their accumulated updates,
40 /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
41 ///
42 /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
43 /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
44 /// `Ord` implementations.
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// use differential_dataflow::input::Input;
50 /// use differential_dataflow::operators::Reduce;
51 ///
52 /// ::timely::example(|scope| {
53 /// // report the smallest value for each group
54 /// scope.new_collection_from(1 .. 10).1
55 /// .map(|x| (x / 3, x))
56 /// .reduce(|_key, input, output| {
57 /// output.push((*input[0].0, 1))
58 /// });
59 /// });
60 /// ```
61 fn reduce<L, V2: Data, R2: Ord+Abelian+'static>(&self, logic: L) -> Collection<G, (K, V2), R2>
62 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
63 self.reduce_named("Reduce", logic)
64 }
65
66 /// As `reduce` with the ability to name the operator.
67 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
68 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static;
69}
70
71impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>
72 where
73 G: Scope<Timestamp: Lattice+Ord>,
74 K: ExchangeData+Hashable,
75 V: ExchangeData,
76 R: ExchangeData+Semigroup,
77 {
78 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
79 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
80 self.arrange_by_key_named(&format!("Arrange: {}", name))
81 .reduce_named(name, logic)
82 }
83}
84
85impl<G, K: Data, V: Data, T1, R: Ord+Semigroup+'static> Reduce<G, K, V, R> for Arranged<G, T1>
86where
87 G: Scope<Timestamp=T1::Time>,
88 T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a V, Diff=R>+Clone+'static,
89{
90 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
91 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
92 self.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
93 .as_collection(|k,v| (k.clone(), v.clone()))
94 }
95}
96
97/// Extension trait for the `threshold` and `distinct` differential dataflow methods.
98pub trait Threshold<G: Scope<Timestamp: Lattice+Ord>, K: Data, R1: Semigroup> {
99 /// Transforms the multiplicity of records.
100 ///
101 /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
102 /// least the computation may behave as if it does. Otherwise, the transformation
103 /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// use differential_dataflow::input::Input;
109 /// use differential_dataflow::operators::Threshold;
110 ///
111 /// ::timely::example(|scope| {
112 /// // report at most one of each key.
113 /// scope.new_collection_from(1 .. 10).1
114 /// .map(|x| x / 3)
115 /// .threshold(|_,c| c % 2);
116 /// });
117 /// ```
118 fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
119 self.threshold_named("Threshold", thresh)
120 }
121
122 /// A `threshold` with the ability to name the operator.
123 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;
124
125 /// Reduces the collection to one occurrence of each distinct element.
126 ///
127 /// # Examples
128 ///
129 /// ```
130 /// use differential_dataflow::input::Input;
131 /// use differential_dataflow::operators::Threshold;
132 ///
133 /// ::timely::example(|scope| {
134 /// // report at most one of each key.
135 /// scope.new_collection_from(1 .. 10).1
136 /// .map(|x| x / 3)
137 /// .distinct();
138 /// });
139 /// ```
140 fn distinct(&self) -> Collection<G, K, isize> {
141 self.distinct_core()
142 }
143
144 /// Distinct for general integer differences.
145 ///
146 /// This method allows `distinct` to produce collections whose difference
147 /// type is something other than an `isize` integer, for example perhaps an
148 /// `i32`.
149 fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(&self) -> Collection<G, K, R2> {
150 self.threshold_named("Distinct", |_,_| R2::from(1i8))
151 }
152}
153
154impl<G: Scope<Timestamp: Lattice+Ord>, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1> {
155 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
156 self.arrange_by_self_named(&format!("Arrange: {}", name))
157 .threshold_named(name, thresh)
158 }
159}
160
161impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
162where
163 G: Scope<Timestamp=T1::Time>,
164 T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R1>+Clone+'static,
165{
166 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
167 self.reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
168 .as_collection(|k,_| k.clone())
169 }
170}
171
172/// Extension trait for the `count` differential dataflow method.
173pub trait Count<G: Scope<Timestamp: Lattice+Ord>, K: Data, R: Semigroup> {
174 /// Counts the number of occurrences of each element.
175 ///
176 /// # Examples
177 ///
178 /// ```
179 /// use differential_dataflow::input::Input;
180 /// use differential_dataflow::operators::Count;
181 ///
182 /// ::timely::example(|scope| {
183 /// // report the number of occurrences of each key
184 /// scope.new_collection_from(1 .. 10).1
185 /// .map(|x| x / 3)
186 /// .count();
187 /// });
188 /// ```
189 fn count(&self) -> Collection<G, (K, R), isize> {
190 self.count_core()
191 }
192
193 /// Count for general integer differences.
194 ///
195 /// This method allows `count` to produce collections whose difference
196 /// type is something other than an `isize` integer, for example perhaps an
197 /// `i32`.
198 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2>;
199}
200
201impl<G: Scope<Timestamp: Lattice+Ord>, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R> {
202 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
203 self.arrange_by_self_named("Arrange: Count")
204 .count_core()
205 }
206}
207
208impl<G, K: Data, T1, R: Data+Semigroup> Count<G, K, R> for Arranged<G, T1>
209where
210 G: Scope<Timestamp=T1::Time>,
211 T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwn = K, Val<'a>=&'a (), Diff=R>+Clone+'static,
212{
213 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
214 self.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
215 .as_collection(|k,c| (k.clone(), c.clone()))
216 }
217}
218
219/// Extension trait for the `reduce_core` differential dataflow method.
220pub trait ReduceCore<G: Scope<Timestamp: Lattice+Ord>, K: ToOwned + ?Sized, V: Data, R: Semigroup> {
221 /// Applies `reduce` to arranged data, and returns an arrangement of output data.
222 ///
223 /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
224 /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
225 ///
226 /// # Examples
227 ///
228 /// ```
229 /// use differential_dataflow::input::Input;
230 /// use differential_dataflow::operators::reduce::ReduceCore;
231 /// use differential_dataflow::trace::Trace;
232 /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
233 ///
234 /// ::timely::example(|scope| {
235 ///
236 /// let trace =
237 /// scope.new_collection_from(1 .. 10u32).1
238 /// .map(|x| (x, x))
239 /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
240 /// "Example",
241 /// move |_key, src, dst| dst.push((*src[0].0, 1))
242 /// )
243 /// .trace;
244 /// });
245 /// ```
246 fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
247 where
248 T2: for<'a> Trace<
249 Key<'a>= &'a K,
250 KeyOwn = K,
251 ValOwn = V,
252 Time=G::Timestamp,
253 Diff: Abelian,
254 >+'static,
255 Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
256 L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
257 {
258 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
259 if !input.is_empty() {
260 logic(key, input, change);
261 }
262 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
263 crate::consolidation::consolidate(change);
264 })
265 }
266
267 /// Solves for output updates when presented with inputs and would-be outputs.
268 ///
269 /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
270 /// and it may not be safe to index into the first element.
271 /// At least one of the two collections will be non-empty.
272 fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
273 where
274 T2: for<'a> Trace<
275 Key<'a>=&'a K,
276 KeyOwn = K,
277 ValOwn = V,
278 Time=G::Timestamp,
279 >+'static,
280 Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
281 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
282 ;
283}
284
285impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>
286where
287 G: Scope,
288 G::Timestamp: Lattice+Ord,
289 K: ExchangeData+Hashable,
290 V: ExchangeData,
291 R: ExchangeData+Semigroup,
292{
293 fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
294 where
295 V: Data,
296 T2: for<'a> Trace<
297 Key<'a>=&'a K,
298 KeyOwn = K,
299 ValOwn = V,
300 Time=G::Timestamp,
301 >+'static,
302 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
303 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
304 {
305 self.arrange_by_key_named(&format!("Arrange: {}", name))
306 .reduce_core::<_,Bu,_>(name, logic)
307 }
308}
309
310/// A key-wise reduction of values in an input trace.
311///
312/// This method exists to provide reduce functionality without opinions about qualifying trace types.
313pub fn reduce_trace<G, T1, Bu, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
314where
315 G: Scope<Timestamp=T1::Time>,
316 T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
317 T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
318 Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
319 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
320{
321 let mut result_trace = None;
322
323 // fabricate a data-parallel operator using the `unary_notify` pattern.
324 let stream = {
325
326 let result_trace = &mut result_trace;
327 trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
328
329 // Acquire a logger for arrange events.
330 let logger = trace.stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
331
332 let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
333 let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
334 // If there is default exert logic set, install it.
335 if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
336 empty.set_exert_logic(exert_logic);
337 }
338
339
340 let mut source_trace = trace.trace.clone();
341
342 let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
343
344 // let mut output_trace = TraceRc::make_from(agent).0;
345 *result_trace = Some(output_reader.clone());
346
347 // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
348 // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
349 let mut new_interesting_times = Vec::<G::Timestamp>::new();
350
351 // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
352 // as well as capabilities for these times (or their lower envelope, at least).
353 let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
354 let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
355
356 // buffers and logic for computing per-key interesting times "efficiently".
357 let mut interesting_times = Vec::<G::Timestamp>::new();
358
359 // Upper and lower frontiers for the pending input and output batches to process.
360 let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
361 let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
362
363 // Output batches may need to be built piecemeal, and these temp storage help there.
364 let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
365 let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
366
367 let id = trace.stream.scope().index();
368
369 move |input, output| {
370
371 // The `reduce` operator receives fully formed batches, which each serve as an indication
372 // that the frontier has advanced to the upper bound of their description.
373 //
374 // Although we could act on each individually, several may have been sent, and it makes
375 // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
376 // attention to which times need to be collected under which capability, so that we can
377 // assemble output batches correctly. We will maintain several builders concurrently, and
378 // place output updates into the appropriate builder.
379 //
380 // It turns out we must use notificators, as we cannot await empty batches from arrange to
381 // indicate progress, as the arrange may not hold the capability to send such. Instead, we
382 // must watch for progress here (and the upper bound of received batches) to tell us how
383 // far we can process work.
384 //
385 // We really want to retire all batches we receive, so we want a frontier which reflects
386 // both information from batches as well as progress information. I think this means that
387 // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
388
389 let mut batch_cursors = Vec::new();
390 let mut batch_storage = Vec::new();
391
392 // Downgrade previous upper limit to be current lower limit.
393 lower_limit.clear();
394 lower_limit.extend(upper_limit.borrow().iter().cloned());
395
396 // Drain the input stream of batches, validating the contiguity of the batch descriptions and
397 // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
398 // times in the batch.
399 input.for_each(|capability, batches| {
400
401 for batch in batches.drain(..) {
402 upper_limit.clone_from(batch.upper());
403 batch_cursors.push(batch.cursor());
404 batch_storage.push(batch);
405 }
406
407 // Ensure that `capabilities` covers the capability of the batch.
408 capabilities.retain(|cap| !capability.time().less_than(cap.time()));
409 if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
410 capabilities.push(capability.retain());
411 }
412 });
413
414 // Pull in any subsequent empty batches we believe to exist.
415 source_trace.advance_upper(&mut upper_limit);
416
417 // Only if our upper limit has advanced should we do work.
418 if upper_limit != lower_limit {
419
420 // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
421 // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
422 // to indicate forward progress, and must hope that downstream operators look at progress frontiers
423 // as well as batch descriptions.
424 //
425 // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
426 if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
427
428 // `interesting` contains "warnings" about keys and times that may need to be re-considered.
429 // We first extract those times from this list that lie in the interval we will process.
430 sort_dedup(&mut interesting);
431 // `exposed` contains interesting (key, time)s now below `upper_limit`
432 let mut exposed_keys = T1::KeyContainer::with_capacity(0);
433 let mut exposed_time = T1::TimeContainer::with_capacity(0);
434 // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
435 interesting.retain(|(key, time)| {
436 if upper_limit.less_equal(time) { true } else {
437 exposed_keys.push_own(key);
438 exposed_time.push_own(time);
439 false
440 }
441 });
442
443 // Prepare an output buffer and builder for each capability.
444 //
445 // We buffer and build separately, as outputs are produced grouped by time, whereas the
446 // builder wants to see outputs grouped by value. While the per-key computation could
447 // do the re-sorting itself, buffering per-key outputs lets us double check the results
448 // against other implementations for accuracy.
449 //
450 // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
451 // this as long as it requires that there is only one capability for each message.
452 let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
453 let mut builders = Vec::new();
454 for cap in capabilities.iter() {
455 buffers.push((cap.time().clone(), Vec::new()));
456 builders.push(Bu::new());
457 }
458
459 let mut buffer = Bu::Input::default();
460
461 // cursors for navigating input and output traces.
462 let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
463 let source_storage = &source_storage;
464 let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
465 let output_storage = &output_storage;
466 let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
467 let batch_storage = &batch_storage;
468
469 let mut thinker = history_replay::HistoryReplayer::new();
470
471 // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
472 //
473 // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
474 // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
475 // There could perhaps be a less provocative variable name.
476 let mut exposed_position = 0;
477 while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
478
479 // Determine the next key we will work on; could be synthetic, could be from a batch.
480 let key1 = exposed_keys.get(exposed_position);
481 let key2 = batch_cursor.get_key(batch_storage);
482 let key = match (key1, key2) {
483 (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
484 (Some(key1), None) => key1,
485 (None, Some(key2)) => key2,
486 (None, None) => unreachable!(),
487 };
488
489 // `interesting_times` contains those times between `lower_issued` and `upper_limit`
490 // that we need to re-consider. We now populate it, but perhaps this should be left
491 // to the per-key computation, which may be able to avoid examining the times of some
492 // values (for example, in the case of min/max/topk).
493 interesting_times.clear();
494
495 // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
496 while exposed_keys.get(exposed_position) == Some(key) {
497 interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
498 exposed_position += 1;
499 }
500
501 // tidy up times, removing redundancy.
502 sort_dedup(&mut interesting_times);
503
504 // do the per-key computation.
505 let _counters = thinker.compute(
506 key,
507 (&mut source_cursor, source_storage),
508 (&mut output_cursor, output_storage),
509 (&mut batch_cursor, batch_storage),
510 &mut interesting_times,
511 &mut logic,
512 &upper_limit,
513 &mut buffers[..],
514 &mut new_interesting_times,
515 );
516
517 if batch_cursor.get_key(batch_storage) == Some(key) {
518 batch_cursor.step_key(batch_storage);
519 }
520
521 // Record future warnings about interesting times (and assert they should be "future").
522 for time in new_interesting_times.drain(..) {
523 debug_assert!(upper_limit.less_equal(&time));
524 interesting.push((T1::owned_key(key), time));
525 }
526
527 // Sort each buffer by value and move into the corresponding builder.
528 // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
529 // (ii) that the buffers are time-ordered, and (iii) that the builders accept
530 // arbitrarily ordered times.
531 for index in 0 .. buffers.len() {
532 buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
533 for (val, time, diff) in buffers[index].1.drain(..) {
534 buffer.push_into(((T1::owned_key(key), val), time, diff));
535 builders[index].push(&mut buffer);
536 buffer.clear();
537 }
538 }
539 }
540
541 // We start sealing output batches from the lower limit (previous upper limit).
542 // In principle, we could update `lower_limit` itself, and it should arrive at
543 // `upper_limit` by the end of the process.
544 output_lower.clear();
545 output_lower.extend(lower_limit.borrow().iter().cloned());
546
547 // build and ship each batch (because only one capability per message).
548 for (index, builder) in builders.drain(..).enumerate() {
549
550 // Form the upper limit of the next batch, which includes all times greater
551 // than the input batch, or the capabilities from i + 1 onward.
552 output_upper.clear();
553 output_upper.extend(upper_limit.borrow().iter().cloned());
554 for capability in &capabilities[index + 1 ..] {
555 output_upper.insert(capability.time().clone());
556 }
557
558 if output_upper.borrow() != output_lower.borrow() {
559
560 let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
561 let batch = builder.done(description);
562
563 // ship batch to the output, and commit to the output trace.
564 output.session(&capabilities[index]).give(batch.clone());
565 output_writer.insert(batch, Some(capabilities[index].time().clone()));
566
567 output_lower.clear();
568 output_lower.extend(output_upper.borrow().iter().cloned());
569 }
570 }
571
572 // This should be true, as the final iteration introduces no capabilities, and
573 // uses exactly `upper_limit` to determine the upper bound. Good to check though.
574 assert!(output_upper.borrow() == upper_limit.borrow());
575
576 // Determine the frontier of our interesting times.
577 let mut frontier = Antichain::<G::Timestamp>::new();
578 for (_, time) in &interesting {
579 frontier.insert_ref(time);
580 }
581
582 // Update `capabilities` to reflect interesting pairs described by `frontier`.
583 let mut new_capabilities = Vec::new();
584 for time in frontier.borrow().iter() {
585 if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
586 new_capabilities.push(cap.delayed(time));
587 }
588 else {
589 println!("{}:\tfailed to find capability less than new frontier time:", id);
590 println!("{}:\t time: {:?}", id, time);
591 println!("{}:\t caps: {:?}", id, capabilities);
592 println!("{}:\t uppr: {:?}", id, upper_limit);
593 }
594 }
595 capabilities = new_capabilities;
596
597 // ensure that observed progress is reflected in the output.
598 output_writer.seal(upper_limit.clone());
599 }
600 else {
601 output_writer.seal(upper_limit.clone());
602 }
603
604 // We only anticipate future times in advance of `upper_limit`.
605 source_trace.set_logical_compaction(upper_limit.borrow());
606 output_reader.set_logical_compaction(upper_limit.borrow());
607
608 // We will only slice the data between future batches.
609 source_trace.set_physical_compaction(upper_limit.borrow());
610 output_reader.set_physical_compaction(upper_limit.borrow());
611 }
612
613 // Exert trace maintenance if we have been so requested.
614 output_writer.exert();
615 }
616 }
617 )
618 };
619
620 Arranged { stream, trace: result_trace.unwrap() }
621}
622
623
624#[inline(never)]
625fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
626 list.dedup();
627 list.sort();
628 list.dedup();
629}
630
631trait PerKeyCompute<'a, C1, C2, C3, V>
632where
633 C1: Cursor,
634 C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
635 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
636 V: Clone + Ord,
637{
638 fn new() -> Self;
639 fn compute<L>(
640 &mut self,
641 key: C1::Key<'a>,
642 source_cursor: (&mut C1, &'a C1::Storage),
643 output_cursor: (&mut C2, &'a C2::Storage),
644 batch_cursor: (&mut C3, &'a C3::Storage),
645 times: &mut Vec<C1::Time>,
646 logic: &mut L,
647 upper_limit: &Antichain<C1::Time>,
648 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
649 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
650 where
651 L: FnMut(
652 C1::Key<'a>,
653 &[(C1::Val<'a>, C1::Diff)],
654 &mut Vec<(V, C2::Diff)>,
655 &mut Vec<(V, C2::Diff)>,
656 );
657}
658
659
660/// Implementation based on replaying historical and new updates together.
661mod history_replay {
662
663 use timely::progress::Antichain;
664 use timely::PartialOrder;
665
666 use crate::lattice::Lattice;
667 use crate::trace::Cursor;
668 use crate::operators::ValueHistory;
669
670 use super::{PerKeyCompute, sort_dedup};
671
672 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
673 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
674 pub struct HistoryReplayer<'a, C1, C2, C3, V>
675 where
676 C1: Cursor,
677 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
678 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
679 V: Clone + Ord,
680 {
681 input_history: ValueHistory<'a, C1>,
682 output_history: ValueHistory<'a, C2>,
683 batch_history: ValueHistory<'a, C3>,
684 input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
685 output_buffer: Vec<(V, C2::Diff)>,
686 update_buffer: Vec<(V, C2::Diff)>,
687 output_produced: Vec<((V, C2::Time), C2::Diff)>,
688 synth_times: Vec<C1::Time>,
689 meets: Vec<C1::Time>,
690 times_current: Vec<C1::Time>,
691 temporary: Vec<C1::Time>,
692 }
693
694 impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V>
695 where
696 C1: Cursor,
697 C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
698 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
699 V: Clone + Ord,
700 {
701 fn new() -> Self {
702 HistoryReplayer {
703 input_history: ValueHistory::new(),
704 output_history: ValueHistory::new(),
705 batch_history: ValueHistory::new(),
706 input_buffer: Vec::new(),
707 output_buffer: Vec::new(),
708 update_buffer: Vec::new(),
709 output_produced: Vec::new(),
710 synth_times: Vec::new(),
711 meets: Vec::new(),
712 times_current: Vec::new(),
713 temporary: Vec::new(),
714 }
715 }
716 #[inline(never)]
717 fn compute<L>(
718 &mut self,
719 key: C1::Key<'a>,
720 (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
721 (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
722 (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
723 times: &mut Vec<C1::Time>,
724 logic: &mut L,
725 upper_limit: &Antichain<C1::Time>,
726 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
727 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
728 where
729 L: FnMut(
730 C1::Key<'a>,
731 &[(C1::Val<'a>, C1::Diff)],
732 &mut Vec<(V, C2::Diff)>,
733 &mut Vec<(V, C2::Diff)>,
734 )
735 {
736
737 // The work we need to perform is at times defined principally by the contents of `batch_cursor`
738 // and `times`, respectively "new work we just received" and "old times we were warned about".
739 //
740 // Our first step is to identify these times, so that we can use them to restrict the amount of
741 // information we need to recover from `input` and `output`; as all times of interest will have
742 // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
743 // loaded times by performing the lattice `join` with this value.
744
745 // Load the batch contents.
746 let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
747
748 // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
749 // can be used to advance other historical times, which may consolidate their representation. As
750 // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
751 // history forward.
752
753 self.meets.clear();
754 self.meets.extend(times.iter().cloned());
755 for index in (1 .. self.meets.len()).rev() {
756 self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
757 }
758
759 // Determine the meet of times in `batch` and `times`.
760 let mut meet = None;
761 update_meet(&mut meet, self.meets.get(0));
762 update_meet(&mut meet, batch_replay.meet());
763 // if let Some(time) = self.meets.get(0) {
764 // meet = match meet {
765 // None => Some(self.meets[0].clone()),
766 // Some(x) => Some(x.meet(&self.meets[0])),
767 // };
768 // }
769 // if let Some(time) = batch_replay.meet() {
770 // meet = match meet {
771 // None => Some(time.clone()),
772 // Some(x) => Some(x.meet(&time)),
773 // };
774 // }
775
776 // Having determined the meet, we can load the input and output histories, where we
777 // advance all times by joining them with `meet`. The resulting times are more compact
778 // and guaranteed to accumulate identically for times greater or equal to `meet`.
779
780 // Load the input and output histories.
781 let mut input_replay = if let Some(meet) = meet.as_ref() {
782 self.input_history.replay_key(source_cursor, source_storage, key, |time| {
783 let mut time = C1::owned_time(time);
784 time.join_assign(meet);
785 time
786 })
787 }
788 else {
789 self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time))
790 };
791 let mut output_replay = if let Some(meet) = meet.as_ref() {
792 self.output_history.replay_key(output_cursor, output_storage, key, |time| {
793 let mut time = C2::owned_time(time);
794 time.join_assign(meet);
795 time
796 })
797 }
798 else {
799 self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time))
800 };
801
802 self.synth_times.clear();
803 self.times_current.clear();
804 self.output_produced.clear();
805
806 // The frontier of times we may still consider.
807 // Derived from frontiers of our update histories, supplied times, and synthetic times.
808
809 let mut times_slice = ×[..];
810 let mut meets_slice = &self.meets[..];
811
812 let mut compute_counter = 0;
813 let mut output_counter = 0;
814
815 // We have candidate times from `batch` and `times`, as well as times identified by either
816 // `input` or `output`. Finally, we may have synthetic times produced as the join of times
817 // we consider in the course of evaluation. As long as any of these times exist, we need to
818 // keep examining times.
819 while let Some(next_time) = [ batch_replay.time(),
820 times_slice.first(),
821 input_replay.time(),
822 output_replay.time(),
823 self.synth_times.last(),
824 ].iter().cloned().flatten().min().cloned() {
825
826 // Advance input and output history replayers. This marks applicable updates as active.
827 input_replay.step_while_time_is(&next_time);
828 output_replay.step_while_time_is(&next_time);
829
830 // One of our goals is to determine if `next_time` is "interesting", meaning whether we
831 // have any evidence that we should re-evaluate the user logic at this time. For a time
832 // to be "interesting" it would need to be the join of times that include either a time
833 // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
834
835 // Advance batch history, and capture whether an update exists at `next_time`.
836 let mut interesting = batch_replay.step_while_time_is(&next_time);
837 if interesting {
838 if let Some(meet) = meet.as_ref() {
839 batch_replay.advance_buffer_by(meet);
840 }
841 }
842
843 // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
844 while self.synth_times.last() == Some(&next_time) {
845 // We don't know enough about `next_time` to avoid putting it in to `times_current`.
846 // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
847 self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
848 interesting = true;
849 }
850 while times_slice.first() == Some(&next_time) {
851 // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
852 self.times_current.push(times_slice[0].clone());
853 times_slice = ×_slice[1..];
854 meets_slice = &meets_slice[1..];
855 interesting = true;
856 }
857
858 // Times could also be interesting if an interesting time is less than them, as they would join
859 // and become the time itself. They may not equal the current time because whatever frontier we
860 // are tracking may not have advanced far enough.
861 // TODO: `batch_history` may or may not be super compact at this point, and so this check might
862 // yield false positives if not sufficiently compact. Maybe we should into this and see.
863 interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
864 interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
865
866 // We should only process times that are not in advance of `upper_limit`.
867 //
868 // We have no particular guarantee that known times will not be in advance of `upper_limit`.
869 // We may have the guarantee that synthetic times will not be, as we test against the limit
870 // before we add the time to `synth_times`.
871 if !upper_limit.less_equal(&next_time) {
872
873 // We should re-evaluate the computation if this is an interesting time.
874 // If the time is uninteresting (and our logic is sound) it is not possible for there to be
875 // output produced. This sounds like a good test to have for debug builds!
876 if interesting {
877
878 compute_counter += 1;
879
880 // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
881 debug_assert!(self.input_buffer.is_empty());
882 meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet));
883 for &((value, ref time), ref diff) in input_replay.buffer().iter() {
884 if time.less_equal(&next_time) {
885 self.input_buffer.push((value, diff.clone()));
886 }
887 else {
888 self.temporary.push(next_time.join(time));
889 }
890 }
891 for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
892 if time.less_equal(&next_time) {
893 self.input_buffer.push((value, diff.clone()));
894 }
895 else {
896 self.temporary.push(next_time.join(time));
897 }
898 }
899 crate::consolidation::consolidate(&mut self.input_buffer);
900
901 meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet));
902 for &((value, ref time), ref diff) in output_replay.buffer().iter() {
903 if time.less_equal(&next_time) {
904 self.output_buffer.push((C2::owned_val(value), diff.clone()));
905 }
906 else {
907 self.temporary.push(next_time.join(time));
908 }
909 }
910 for &((ref value, ref time), ref diff) in self.output_produced.iter() {
911 if time.less_equal(&next_time) {
912 self.output_buffer.push(((*value).to_owned(), diff.clone()));
913 }
914 else {
915 self.temporary.push(next_time.join(time));
916 }
917 }
918 crate::consolidation::consolidate(&mut self.output_buffer);
919
920 // Apply user logic if non-empty input and see what happens!
921 if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
922 logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
923 self.input_buffer.clear();
924 self.output_buffer.clear();
925 }
926
927 // output_replay.advance_buffer_by(&meet);
928 // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
929 // if time.less_equal(&next_time) {
930 // self.output_buffer.push(((*value).clone(), -diff));
931 // }
932 // else {
933 // self.temporary.push(next_time.join(time));
934 // }
935 // }
936 // for &((ref value, ref time), diff) in self.output_produced.iter() {
937 // if time.less_equal(&next_time) {
938 // self.output_buffer.push(((*value).clone(), -diff));
939 // }
940 // else {
941 // self.temporary.push(next_time.join(&time));
942 // }
943 // }
944
945 // Having subtracted output updates from user output, consolidate the results to determine
946 // if there is anything worth reporting. Note: this also orders the results by value, so
947 // that could make the above merging plan even easier.
948 crate::consolidation::consolidate(&mut self.update_buffer);
949
950 // Stash produced updates into both capability-indexed buffers and `output_produced`.
951 // The two locations are important, in that we will compact `output_produced` as we move
952 // through times, but we cannot compact the output buffers because we need their actual
953 // times.
954 if !self.update_buffer.is_empty() {
955
956 output_counter += 1;
957
958 // We *should* be able to find a capability for `next_time`. Any thing else would
959 // indicate a logical error somewhere along the way; either we release a capability
960 // we should have kept, or we have computed the output incorrectly (or both!)
961 let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
962 let idx = outputs.len() - idx.expect("failed to find index") - 1;
963 for (val, diff) in self.update_buffer.drain(..) {
964 self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
965 outputs[idx].1.push((val, next_time.clone(), diff));
966 }
967
968 // Advance times in `self.output_produced` and consolidate the representation.
969 // NOTE: We only do this when we add records; it could be that there are situations
970 // where we want to consolidate even without changes (because an initially
971 // large collection can now be collapsed).
972 if let Some(meet) = meet.as_ref() {
973 for entry in &mut self.output_produced {
974 (entry.0).1 = (entry.0).1.join(meet);
975 }
976 }
977 crate::consolidation::consolidate(&mut self.output_produced);
978 }
979 }
980
981 // Determine synthetic interesting times.
982 //
983 // Synthetic interesting times are produced differently for interesting and uninteresting
984 // times. An uninteresting time must join with an interesting time to become interesting,
985 // which means joins with `self.batch_history` and `self.times_current`. I think we can
986 // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
987 // joined against everything.
988
989 // Any time, even uninteresting times, must be joined with the current accumulation of
990 // batch times as well as the current accumulation of `times_current`.
991 for &((_, ref time), _) in batch_replay.buffer().iter() {
992 if !time.less_equal(&next_time) {
993 self.temporary.push(time.join(&next_time));
994 }
995 }
996 for time in self.times_current.iter() {
997 if !time.less_equal(&next_time) {
998 self.temporary.push(time.join(&next_time));
999 }
1000 }
1001
1002 sort_dedup(&mut self.temporary);
1003
1004 // Introduce synthetic times, and re-organize if we add any.
1005 let synth_len = self.synth_times.len();
1006 for time in self.temporary.drain(..) {
1007 // We can either service `join` now, or must delay for the future.
1008 if upper_limit.less_equal(&time) {
1009 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
1010 new_interesting.push(time);
1011 }
1012 else {
1013 self.synth_times.push(time);
1014 }
1015 }
1016 if self.synth_times.len() > synth_len {
1017 self.synth_times.sort_by(|x,y| y.cmp(x));
1018 self.synth_times.dedup();
1019 }
1020 }
1021 else if interesting {
1022 // We cannot process `next_time` now, and must delay it.
1023 //
1024 // I think we are probably only here because of an uninteresting time declared interesting,
1025 // as initial interesting times are filtered to be in interval, and synthetic times are also
1026 // filtered before introducing them to `self.synth_times`.
1027 new_interesting.push(next_time.clone());
1028 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
1029 }
1030
1031
1032 // Update `meet` to track the meet of each source of times.
1033 meet = None;//T::maximum();
1034 update_meet(&mut meet, batch_replay.meet());
1035 update_meet(&mut meet, input_replay.meet());
1036 update_meet(&mut meet, output_replay.meet());
1037 for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
1038 // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
1039 // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
1040 // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
1041 // for time in self.synth_times.iter() { meet = meet.meet(time); }
1042 update_meet(&mut meet, meets_slice.first());
1043 // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
1044
1045 // Update `times_current` by the frontier.
1046 if let Some(meet) = meet.as_ref() {
1047 for time in self.times_current.iter_mut() {
1048 *time = time.join(meet);
1049 }
1050 }
1051
1052 sort_dedup(&mut self.times_current);
1053 }
1054
1055 // Normalize the representation of `new_interesting`, deduplicating and ordering.
1056 sort_dedup(new_interesting);
1057
1058 (compute_counter, output_counter)
1059 }
1060 }
1061
1062 /// Updates an optional meet by an optional time.
1063 fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
1064 if let Some(time) = other {
1065 if let Some(meet) = meet.as_mut() {
1066 *meet = meet.meet(time);
1067 }
1068 if meet.is_none() {
1069 *meet = Some(time.clone());
1070 }
1071 }
1072 }
1073}