differential_dataflow/operators/reduce.rs
1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use timely::Container;
9use timely::container::PushInto;
10use crate::hashable::Hashable;
11use crate::{Data, ExchangeData, Collection, IntoOwned};
12use crate::difference::{Semigroup, Abelian};
13
14use timely::order::PartialOrder;
15use timely::progress::frontier::Antichain;
16use timely::progress::Timestamp;
17use timely::dataflow::*;
18use timely::dataflow::operators::Operator;
19use timely::dataflow::channels::pact::Pipeline;
20use timely::dataflow::operators::Capability;
21
22use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
23use crate::lattice::Lattice;
24use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
25use crate::trace::cursor::CursorList;
26use crate::trace::implementations::{KeySpine, KeyBuilder, ValSpine, ValBuilder};
27
28use crate::trace::TraceReader;
29
30/// Extension trait for the `reduce` differential dataflow method.
31pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
32 /// Applies a reduction function on records grouped by key.
33 ///
34 /// Input data must be structured as `(key, val)` pairs.
35 /// The user-supplied reduction function takes as arguments
36 ///
37 /// 1. a reference to the key,
38 /// 2. a reference to the slice of values and their accumulated updates,
39 /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
40 ///
41 /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
42 /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
43 /// `Ord` implementations.
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// use differential_dataflow::input::Input;
49 /// use differential_dataflow::operators::Reduce;
50 ///
51 /// ::timely::example(|scope| {
52 /// // report the smallest value for each group
53 /// scope.new_collection_from(1 .. 10).1
54 /// .map(|x| (x / 3, x))
55 /// .reduce(|_key, input, output| {
56 /// output.push((*input[0].0, 1))
57 /// });
58 /// });
59 /// ```
60 fn reduce<L, V2: Data, R2: Ord+Abelian+'static>(&self, logic: L) -> Collection<G, (K, V2), R2>
61 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
62 self.reduce_named("Reduce", logic)
63 }
64
65 /// As `reduce` with the ability to name the operator.
66 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
67 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static;
68}
69
70impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>
71 where
72 G: Scope,
73 G::Timestamp: Lattice+Ord,
74 K: ExchangeData+Hashable,
75 V: ExchangeData,
76 R: ExchangeData+Semigroup,
77 {
78 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
79 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
80 self.arrange_by_key_named(&format!("Arrange: {}", name))
81 .reduce_named(name, logic)
82 }
83}
84
85impl<G, K: Data, V: Data, T1, R: Ord+Semigroup+'static> Reduce<G, K, V, R> for Arranged<G, T1>
86where
87 G: Scope<Timestamp=T1::Time>,
88 T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a V, Diff=R>+Clone+'static,
89 for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>,
90 for<'a> T1::Val<'a> : IntoOwned<'a, Owned = V>,
91{
92 fn reduce_named<L, V2: Data, R2: Ord+Abelian+'static>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
93 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
94 self.reduce_abelian::<_,K,V2,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(name, logic)
95 .as_collection(|k,v| (k.clone(), v.clone()))
96 }
97}
98
99/// Extension trait for the `threshold` and `distinct` differential dataflow methods.
100pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattice+Ord {
101 /// Transforms the multiplicity of records.
102 ///
103 /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
104 /// least the computation may behave as if it does. Otherwise, the transformation
105 /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use differential_dataflow::input::Input;
111 /// use differential_dataflow::operators::Threshold;
112 ///
113 /// ::timely::example(|scope| {
114 /// // report at most one of each key.
115 /// scope.new_collection_from(1 .. 10).1
116 /// .map(|x| x / 3)
117 /// .threshold(|_,c| c % 2);
118 /// });
119 /// ```
120 fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
121 self.threshold_named("Threshold", thresh)
122 }
123
124 /// A `threshold` with the ability to name the operator.
125 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;
126
127 /// Reduces the collection to one occurrence of each distinct element.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use differential_dataflow::input::Input;
133 /// use differential_dataflow::operators::Threshold;
134 ///
135 /// ::timely::example(|scope| {
136 /// // report at most one of each key.
137 /// scope.new_collection_from(1 .. 10).1
138 /// .map(|x| x / 3)
139 /// .distinct();
140 /// });
141 /// ```
142 fn distinct(&self) -> Collection<G, K, isize> {
143 self.distinct_core()
144 }
145
146 /// Distinct for general integer differences.
147 ///
148 /// This method allows `distinct` to produce collections whose difference
149 /// type is something other than an `isize` integer, for example perhaps an
150 /// `i32`.
151 fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(&self) -> Collection<G, K, R2> {
152 self.threshold_named("Distinct", |_,_| R2::from(1i8))
153 }
154}
155
156impl<G: Scope, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
157where G::Timestamp: Lattice+Ord {
158 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
159 self.arrange_by_self_named(&format!("Arrange: {}", name))
160 .threshold_named(name, thresh)
161 }
162}
163
164impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
165where
166 G: Scope<Timestamp=T1::Time>,
167 T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Diff=R1>+Clone+'static,
168 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
169{
170 fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
171 self.reduce_abelian::<_,K,(),KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
172 .as_collection(|k,_| k.clone())
173 }
174}
175
176/// Extension trait for the `count` differential dataflow method.
177pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
178 /// Counts the number of occurrences of each element.
179 ///
180 /// # Examples
181 ///
182 /// ```
183 /// use differential_dataflow::input::Input;
184 /// use differential_dataflow::operators::Count;
185 ///
186 /// ::timely::example(|scope| {
187 /// // report the number of occurrences of each key
188 /// scope.new_collection_from(1 .. 10).1
189 /// .map(|x| x / 3)
190 /// .count();
191 /// });
192 /// ```
193 fn count(&self) -> Collection<G, (K, R), isize> {
194 self.count_core()
195 }
196
197 /// Count for general integer differences.
198 ///
199 /// This method allows `count` to produce collections whose difference
200 /// type is something other than an `isize` integer, for example perhaps an
201 /// `i32`.
202 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2>;
203}
204
205impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R>
206where
207 G::Timestamp: Lattice+Ord,
208{
209 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
210 self.arrange_by_self_named("Arrange: Count")
211 .count_core()
212 }
213}
214
215impl<G, K: Data, T1, R: Data+Semigroup> Count<G, K, R> for Arranged<G, T1>
216where
217 G: Scope<Timestamp=T1::Time>,
218 T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Diff=R>+Clone+'static,
219 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
220{
221 fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> Collection<G, (K, R), R2> {
222 self.reduce_abelian::<_,K,R,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
223 .as_collection(|k,c| (k.clone(), c.clone()))
224 }
225}
226
227/// Extension trait for the `reduce_core` differential dataflow method.
228pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
229 /// Applies `reduce` to arranged data, and returns an arrangement of output data.
230 ///
231 /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
232 /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use differential_dataflow::input::Input;
238 /// use differential_dataflow::operators::reduce::ReduceCore;
239 /// use differential_dataflow::trace::Trace;
240 /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
241 ///
242 /// ::timely::example(|scope| {
243 ///
244 /// let trace =
245 /// scope.new_collection_from(1 .. 10u32).1
246 /// .map(|x| (x, x))
247 /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
248 /// "Example",
249 /// move |_key, src, dst| dst.push((*src[0].0, 1))
250 /// )
251 /// .trace;
252 /// });
253 /// ```
254 fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
255 where
256 T2: for<'a> Trace<Key<'a>= &'a K, Time=G::Timestamp>+'static,
257 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
258 T2::Diff: Abelian,
259 T2::Batch: Batch,
260 Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
261 L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
262 {
263 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
264 if !input.is_empty() {
265 logic(key, input, change);
266 }
267 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
268 crate::consolidation::consolidate(change);
269 })
270 }
271
272 /// Solves for output updates when presented with inputs and would-be outputs.
273 ///
274 /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
275 /// and it may not be safe to index into the first element.
276 /// At least one of the two collections will be non-empty.
277 fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
278 where
279 T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
280 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
281 T2::Batch: Batch,
282 Bu: Builder<Time=T2::Time, Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
283 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
284 ;
285}
286
287impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>
288where
289 G: Scope,
290 G::Timestamp: Lattice+Ord,
291 K: ExchangeData+Hashable,
292 V: ExchangeData,
293 R: ExchangeData+Semigroup,
294{
295 fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
296 where
297 V: Data,
298 T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
299 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
300 T2::Batch: Batch,
301 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
302 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
303 {
304 self.arrange_by_key_named(&format!("Arrange: {}", name))
305 .reduce_core::<_,_,_,Bu,_>(name, logic)
306 }
307}
308
309/// A key-wise reduction of values in an input trace.
310///
311/// This method exists to provide reduce functionality without opinions about qualifying trace types.
312pub fn reduce_trace<G, T1, Bu, T2, K, V, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
313where
314 G: Scope<Timestamp=T1::Time>,
315 T1: TraceReader + Clone + 'static,
316 for<'a> T1::Key<'a> : IntoOwned<'a, Owned = K>,
317 T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time> + 'static,
318 K: Ord + 'static,
319 V: Data,
320 for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
321 T2::Batch: Batch,
322 Bu: Builder<Time=T2::Time, Output = T2::Batch>,
323 Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
324 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
325{
326 let mut result_trace = None;
327
328 // fabricate a data-parallel operator using the `unary_notify` pattern.
329 let stream = {
330
331 let result_trace = &mut result_trace;
332 trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
333
334 let logger = {
335 let scope = trace.stream.scope();
336 let register = scope.log_register();
337 register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into)
338 };
339
340 let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
341 let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
342 // If there is default exert logic set, install it.
343 if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
344 empty.set_exert_logic(exert_logic);
345 }
346
347
348 let mut source_trace = trace.trace.clone();
349
350 let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
351
352 // let mut output_trace = TraceRc::make_from(agent).0;
353 *result_trace = Some(output_reader.clone());
354
355 // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
356 // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
357 let mut new_interesting_times = Vec::<G::Timestamp>::new();
358
359 // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
360 // as well as capabilities for these times (or their lower envelope, at least).
361 let mut interesting = Vec::<(K, G::Timestamp)>::new();
362 let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
363
364 // buffers and logic for computing per-key interesting times "efficiently".
365 let mut interesting_times = Vec::<G::Timestamp>::new();
366
367 // Upper and lower frontiers for the pending input and output batches to process.
368 let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
369 let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
370
371 // Output batches may need to be built piecemeal, and these temp storage help there.
372 let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
373 let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
374
375 let id = trace.stream.scope().index();
376
377 move |input, output| {
378
379 // The `reduce` operator receives fully formed batches, which each serve as an indication
380 // that the frontier has advanced to the upper bound of their description.
381 //
382 // Although we could act on each individually, several may have been sent, and it makes
383 // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
384 // attention to which times need to be collected under which capability, so that we can
385 // assemble output batches correctly. We will maintain several builders concurrently, and
386 // place output updates into the appropriate builder.
387 //
388 // It turns out we must use notificators, as we cannot await empty batches from arrange to
389 // indicate progress, as the arrange may not hold the capability to send such. Instead, we
390 // must watch for progress here (and the upper bound of received batches) to tell us how
391 // far we can process work.
392 //
393 // We really want to retire all batches we receive, so we want a frontier which reflects
394 // both information from batches as well as progress information. I think this means that
395 // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
396
397 let mut batch_cursors = Vec::new();
398 let mut batch_storage = Vec::new();
399
400 // Downgrade previous upper limit to be current lower limit.
401 lower_limit.clear();
402 lower_limit.extend(upper_limit.borrow().iter().cloned());
403
404 // Drain the input stream of batches, validating the contiguity of the batch descriptions and
405 // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
406 // times in the batch.
407 input.for_each(|capability, batches| {
408
409 for batch in batches.drain(..) {
410 upper_limit.clone_from(batch.upper());
411 batch_cursors.push(batch.cursor());
412 batch_storage.push(batch);
413 }
414
415 // Ensure that `capabilities` covers the capability of the batch.
416 capabilities.retain(|cap| !capability.time().less_than(cap.time()));
417 if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
418 capabilities.push(capability.retain());
419 }
420 });
421
422 // Pull in any subsequent empty batches we believe to exist.
423 source_trace.advance_upper(&mut upper_limit);
424
425 // Only if our upper limit has advanced should we do work.
426 if upper_limit != lower_limit {
427
428 // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
429 // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
430 // to indicate forward progress, and must hope that downstream operators look at progress frontiers
431 // as well as batch descriptions.
432 //
433 // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
434 if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
435
436 // `interesting` contains "warnings" about keys and times that may need to be re-considered.
437 // We first extract those times from this list that lie in the interval we will process.
438 sort_dedup(&mut interesting);
439 // `exposed` contains interesting (key, time)s now below `upper_limit`
440 let exposed = {
441 let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time));
442 interesting = new_interesting;
443 exposed
444 };
445
446 // Prepare an output buffer and builder for each capability.
447 //
448 // We buffer and build separately, as outputs are produced grouped by time, whereas the
449 // builder wants to see outputs grouped by value. While the per-key computation could
450 // do the re-sorting itself, buffering per-key outputs lets us double check the results
451 // against other implementations for accuracy.
452 //
453 // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
454 // this as long as it requires that there is only one capability for each message.
455 let mut buffers = Vec::<(G::Timestamp, Vec<(V, G::Timestamp, T2::Diff)>)>::new();
456 let mut builders = Vec::new();
457 for cap in capabilities.iter() {
458 buffers.push((cap.time().clone(), Vec::new()));
459 builders.push(Bu::new());
460 }
461
462 let mut buffer = Bu::Input::default();
463
464 // cursors for navigating input and output traces.
465 let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
466 let source_storage = &source_storage;
467 let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
468 let output_storage = &output_storage;
469 let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
470 let batch_storage = &batch_storage;
471
472 let mut thinker = history_replay::HistoryReplayer::new();
473
474 // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
475 //
476 // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
477 // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
478 // There could perhaps be a less provocative variable name.
479 let mut exposed_position = 0;
480 while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {
481
482 use std::borrow::Borrow;
483
484 // Determine the next key we will work on; could be synthetic, could be from a batch.
485 let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0));
486 let key2 = batch_cursor.get_key(batch_storage);
487 let key = match (key1, key2) {
488 (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
489 (Some(key1), None) => key1,
490 (None, Some(key2)) => key2,
491 (None, None) => unreachable!(),
492 };
493
494 // `interesting_times` contains those times between `lower_issued` and `upper_limit`
495 // that we need to re-consider. We now populate it, but perhaps this should be left
496 // to the per-key computation, which may be able to avoid examining the times of some
497 // values (for example, in the case of min/max/topk).
498 interesting_times.clear();
499
500 // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
501 while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&<T1::Key<'_> as IntoOwned>::borrow_as(&k))).unwrap_or(false) {
502 interesting_times.push(exposed[exposed_position].1.clone());
503 exposed_position += 1;
504 }
505
506 // tidy up times, removing redundancy.
507 sort_dedup(&mut interesting_times);
508
509 // do the per-key computation.
510 let _counters = thinker.compute(
511 key,
512 (&mut source_cursor, source_storage),
513 (&mut output_cursor, output_storage),
514 (&mut batch_cursor, batch_storage),
515 &mut interesting_times,
516 &mut logic,
517 &upper_limit,
518 &mut buffers[..],
519 &mut new_interesting_times,
520 );
521
522 if batch_cursor.get_key(batch_storage) == Some(key) {
523 batch_cursor.step_key(batch_storage);
524 }
525
526 // Record future warnings about interesting times (and assert they should be "future").
527 for time in new_interesting_times.drain(..) {
528 debug_assert!(upper_limit.less_equal(&time));
529 interesting.push((key.into_owned(), time));
530 }
531
532 // Sort each buffer by value and move into the corresponding builder.
533 // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
534 // (ii) that the buffers are time-ordered, and (iii) that the builders accept
535 // arbitrarily ordered times.
536 for index in 0 .. buffers.len() {
537 buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
538 for (val, time, diff) in buffers[index].1.drain(..) {
539 buffer.push_into(((key.into_owned(), val), time, diff));
540 builders[index].push(&mut buffer);
541 buffer.clear();
542 }
543 }
544 }
545
546 // We start sealing output batches from the lower limit (previous upper limit).
547 // In principle, we could update `lower_limit` itself, and it should arrive at
548 // `upper_limit` by the end of the process.
549 output_lower.clear();
550 output_lower.extend(lower_limit.borrow().iter().cloned());
551
552 // build and ship each batch (because only one capability per message).
553 for (index, builder) in builders.drain(..).enumerate() {
554
555 // Form the upper limit of the next batch, which includes all times greater
556 // than the input batch, or the capabilities from i + 1 onward.
557 output_upper.clear();
558 output_upper.extend(upper_limit.borrow().iter().cloned());
559 for capability in &capabilities[index + 1 ..] {
560 output_upper.insert(capability.time().clone());
561 }
562
563 if output_upper.borrow() != output_lower.borrow() {
564
565 let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
566 let batch = builder.done(description);
567
568 // ship batch to the output, and commit to the output trace.
569 output.session(&capabilities[index]).give(batch.clone());
570 output_writer.insert(batch, Some(capabilities[index].time().clone()));
571
572 output_lower.clear();
573 output_lower.extend(output_upper.borrow().iter().cloned());
574 }
575 }
576
577 // This should be true, as the final iteration introduces no capabilities, and
578 // uses exactly `upper_limit` to determine the upper bound. Good to check though.
579 assert!(output_upper.borrow() == upper_limit.borrow());
580
581 // Determine the frontier of our interesting times.
582 let mut frontier = Antichain::<G::Timestamp>::new();
583 for (_, time) in &interesting {
584 frontier.insert_ref(time);
585 }
586
587 // Update `capabilities` to reflect interesting pairs described by `frontier`.
588 let mut new_capabilities = Vec::new();
589 for time in frontier.borrow().iter() {
590 if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
591 new_capabilities.push(cap.delayed(time));
592 }
593 else {
594 println!("{}:\tfailed to find capability less than new frontier time:", id);
595 println!("{}:\t time: {:?}", id, time);
596 println!("{}:\t caps: {:?}", id, capabilities);
597 println!("{}:\t uppr: {:?}", id, upper_limit);
598 }
599 }
600 capabilities = new_capabilities;
601
602 // ensure that observed progress is reflected in the output.
603 output_writer.seal(upper_limit.clone());
604 }
605 else {
606 output_writer.seal(upper_limit.clone());
607 }
608
609 // We only anticipate future times in advance of `upper_limit`.
610 source_trace.set_logical_compaction(upper_limit.borrow());
611 output_reader.set_logical_compaction(upper_limit.borrow());
612
613 // We will only slice the data between future batches.
614 source_trace.set_physical_compaction(upper_limit.borrow());
615 output_reader.set_physical_compaction(upper_limit.borrow());
616 }
617
618 // Exert trace maintenance if we have been so requested.
619 output_writer.exert();
620 }
621 }
622 )
623 };
624
625 Arranged { stream, trace: result_trace.unwrap() }
626}
627
628
629#[inline(never)]
630fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
631 list.dedup();
632 list.sort();
633 list.dedup();
634}
635
636trait PerKeyCompute<'a, C1, C2, C3, V>
637where
638 C1: Cursor,
639 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
640 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
641 V: Clone + Ord,
642 for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>,
643{
644 fn new() -> Self;
645 fn compute<L>(
646 &mut self,
647 key: C1::Key<'a>,
648 source_cursor: (&mut C1, &'a C1::Storage),
649 output_cursor: (&mut C2, &'a C2::Storage),
650 batch_cursor: (&mut C3, &'a C3::Storage),
651 times: &mut Vec<C1::Time>,
652 logic: &mut L,
653 upper_limit: &Antichain<C1::Time>,
654 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
655 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
656 where
657 L: FnMut(
658 C1::Key<'a>,
659 &[(C1::Val<'a>, C1::Diff)],
660 &mut Vec<(V, C2::Diff)>,
661 &mut Vec<(V, C2::Diff)>,
662 );
663}
664
665
666/// Implementation based on replaying historical and new updates together.
667mod history_replay {
668
669 use timely::progress::Antichain;
670 use timely::PartialOrder;
671
672 use crate::lattice::Lattice;
673 use crate::trace::Cursor;
674 use crate::operators::ValueHistory;
675 use crate::IntoOwned;
676
677 use super::{PerKeyCompute, sort_dedup};
678
679 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
680 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
681 pub struct HistoryReplayer<'a, C1, C2, C3, V>
682 where
683 C1: Cursor,
684 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
685 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
686 V: Clone + Ord,
687 {
688 input_history: ValueHistory<'a, C1>,
689 output_history: ValueHistory<'a, C2>,
690 batch_history: ValueHistory<'a, C3>,
691 input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
692 output_buffer: Vec<(V, C2::Diff)>,
693 update_buffer: Vec<(V, C2::Diff)>,
694 output_produced: Vec<((V, C2::Time), C2::Diff)>,
695 synth_times: Vec<C1::Time>,
696 meets: Vec<C1::Time>,
697 times_current: Vec<C1::Time>,
698 temporary: Vec<C1::Time>,
699 }
700
701 impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V>
702 where
703 C1: Cursor,
704 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
705 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
706 V: Clone + Ord,
707 for<'b> C2::Val<'b> : IntoOwned<'b, Owned = V>,
708 {
709 fn new() -> Self {
710 HistoryReplayer {
711 input_history: ValueHistory::new(),
712 output_history: ValueHistory::new(),
713 batch_history: ValueHistory::new(),
714 input_buffer: Vec::new(),
715 output_buffer: Vec::new(),
716 update_buffer: Vec::new(),
717 output_produced: Vec::new(),
718 synth_times: Vec::new(),
719 meets: Vec::new(),
720 times_current: Vec::new(),
721 temporary: Vec::new(),
722 }
723 }
724 #[inline(never)]
725 fn compute<L>(
726 &mut self,
727 key: C1::Key<'a>,
728 (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
729 (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
730 (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
731 times: &mut Vec<C1::Time>,
732 logic: &mut L,
733 upper_limit: &Antichain<C1::Time>,
734 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
735 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
736 where
737 L: FnMut(
738 C1::Key<'a>,
739 &[(C1::Val<'a>, C1::Diff)],
740 &mut Vec<(V, C2::Diff)>,
741 &mut Vec<(V, C2::Diff)>,
742 )
743 {
744
745 // The work we need to perform is at times defined principally by the contents of `batch_cursor`
746 // and `times`, respectively "new work we just received" and "old times we were warned about".
747 //
748 // Our first step is to identify these times, so that we can use them to restrict the amount of
749 // information we need to recover from `input` and `output`; as all times of interest will have
750 // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
751 // loaded times by performing the lattice `join` with this value.
752
753 // Load the batch contents.
754 let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned());
755
756 // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
757 // can be used to advance other historical times, which may consolidate their representation. As
758 // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
759 // history forward.
760
761 self.meets.clear();
762 self.meets.extend(times.iter().cloned());
763 for index in (1 .. self.meets.len()).rev() {
764 self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
765 }
766
767 // Determine the meet of times in `batch` and `times`.
768 let mut meet = None;
769 update_meet(&mut meet, self.meets.get(0));
770 update_meet(&mut meet, batch_replay.meet());
771 // if let Some(time) = self.meets.get(0) {
772 // meet = match meet {
773 // None => Some(self.meets[0].clone()),
774 // Some(x) => Some(x.meet(&self.meets[0])),
775 // };
776 // }
777 // if let Some(time) = batch_replay.meet() {
778 // meet = match meet {
779 // None => Some(time.clone()),
780 // Some(x) => Some(x.meet(&time)),
781 // };
782 // }
783
784 // Having determined the meet, we can load the input and output histories, where we
785 // advance all times by joining them with `meet`. The resulting times are more compact
786 // and guaranteed to accumulate identically for times greater or equal to `meet`.
787
788 // Load the input and output histories.
789 let mut input_replay = if let Some(meet) = meet.as_ref() {
790 self.input_history.replay_key(source_cursor, source_storage, key, |time| {
791 let mut time = time.into_owned();
792 time.join_assign(meet);
793 time
794 })
795 }
796 else {
797 self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned())
798 };
799 let mut output_replay = if let Some(meet) = meet.as_ref() {
800 self.output_history.replay_key(output_cursor, output_storage, key, |time| {
801 let mut time = time.into_owned();
802 time.join_assign(meet);
803 time
804 })
805 }
806 else {
807 self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned())
808 };
809
810 self.synth_times.clear();
811 self.times_current.clear();
812 self.output_produced.clear();
813
814 // The frontier of times we may still consider.
815 // Derived from frontiers of our update histories, supplied times, and synthetic times.
816
817 let mut times_slice = ×[..];
818 let mut meets_slice = &self.meets[..];
819
820 let mut compute_counter = 0;
821 let mut output_counter = 0;
822
823 // We have candidate times from `batch` and `times`, as well as times identified by either
824 // `input` or `output`. Finally, we may have synthetic times produced as the join of times
825 // we consider in the course of evaluation. As long as any of these times exist, we need to
826 // keep examining times.
827 while let Some(next_time) = [ batch_replay.time(),
828 times_slice.first(),
829 input_replay.time(),
830 output_replay.time(),
831 self.synth_times.last(),
832 ].iter().cloned().flatten().min().cloned() {
833
834 // Advance input and output history replayers. This marks applicable updates as active.
835 input_replay.step_while_time_is(&next_time);
836 output_replay.step_while_time_is(&next_time);
837
838 // One of our goals is to determine if `next_time` is "interesting", meaning whether we
839 // have any evidence that we should re-evaluate the user logic at this time. For a time
840 // to be "interesting" it would need to be the join of times that include either a time
841 // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
842
843 // Advance batch history, and capture whether an update exists at `next_time`.
844 let mut interesting = batch_replay.step_while_time_is(&next_time);
845 if interesting {
846 if let Some(meet) = meet.as_ref() {
847 batch_replay.advance_buffer_by(meet);
848 }
849 }
850
851 // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
852 while self.synth_times.last() == Some(&next_time) {
853 // We don't know enough about `next_time` to avoid putting it in to `times_current`.
854 // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
855 self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
856 interesting = true;
857 }
858 while times_slice.first() == Some(&next_time) {
859 // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
860 self.times_current.push(times_slice[0].clone());
861 times_slice = ×_slice[1..];
862 meets_slice = &meets_slice[1..];
863 interesting = true;
864 }
865
866 // Times could also be interesting if an interesting time is less than them, as they would join
867 // and become the time itself. They may not equal the current time because whatever frontier we
868 // are tracking may not have advanced far enough.
869 // TODO: `batch_history` may or may not be super compact at this point, and so this check might
870 // yield false positives if not sufficiently compact. Maybe we should into this and see.
871 interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
872 interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
873
874 // We should only process times that are not in advance of `upper_limit`.
875 //
876 // We have no particular guarantee that known times will not be in advance of `upper_limit`.
877 // We may have the guarantee that synthetic times will not be, as we test against the limit
878 // before we add the time to `synth_times`.
879 if !upper_limit.less_equal(&next_time) {
880
881 // We should re-evaluate the computation if this is an interesting time.
882 // If the time is uninteresting (and our logic is sound) it is not possible for there to be
883 // output produced. This sounds like a good test to have for debug builds!
884 if interesting {
885
886 compute_counter += 1;
887
888 // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
889 debug_assert!(self.input_buffer.is_empty());
890 meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet));
891 for &((value, ref time), ref diff) in input_replay.buffer().iter() {
892 if time.less_equal(&next_time) {
893 self.input_buffer.push((value, diff.clone()));
894 }
895 else {
896 self.temporary.push(next_time.join(time));
897 }
898 }
899 for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
900 if time.less_equal(&next_time) {
901 self.input_buffer.push((value, diff.clone()));
902 }
903 else {
904 self.temporary.push(next_time.join(time));
905 }
906 }
907 crate::consolidation::consolidate(&mut self.input_buffer);
908
909 meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet));
910 for &((value, ref time), ref diff) in output_replay.buffer().iter() {
911 if time.less_equal(&next_time) {
912 self.output_buffer.push((value.into_owned(), diff.clone()));
913 }
914 else {
915 self.temporary.push(next_time.join(time));
916 }
917 }
918 for &((ref value, ref time), ref diff) in self.output_produced.iter() {
919 if time.less_equal(&next_time) {
920 self.output_buffer.push(((*value).to_owned(), diff.clone()));
921 }
922 else {
923 self.temporary.push(next_time.join(time));
924 }
925 }
926 crate::consolidation::consolidate(&mut self.output_buffer);
927
928 // Apply user logic if non-empty input and see what happens!
929 if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
930 logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
931 self.input_buffer.clear();
932 self.output_buffer.clear();
933 }
934
935 // output_replay.advance_buffer_by(&meet);
936 // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
937 // if time.less_equal(&next_time) {
938 // self.output_buffer.push(((*value).clone(), -diff));
939 // }
940 // else {
941 // self.temporary.push(next_time.join(time));
942 // }
943 // }
944 // for &((ref value, ref time), diff) in self.output_produced.iter() {
945 // if time.less_equal(&next_time) {
946 // self.output_buffer.push(((*value).clone(), -diff));
947 // }
948 // else {
949 // self.temporary.push(next_time.join(&time));
950 // }
951 // }
952
953 // Having subtracted output updates from user output, consolidate the results to determine
954 // if there is anything worth reporting. Note: this also orders the results by value, so
955 // that could make the above merging plan even easier.
956 crate::consolidation::consolidate(&mut self.update_buffer);
957
958 // Stash produced updates into both capability-indexed buffers and `output_produced`.
959 // The two locations are important, in that we will compact `output_produced` as we move
960 // through times, but we cannot compact the output buffers because we need their actual
961 // times.
962 if !self.update_buffer.is_empty() {
963
964 output_counter += 1;
965
966 // We *should* be able to find a capability for `next_time`. Any thing else would
967 // indicate a logical error somewhere along the way; either we release a capability
968 // we should have kept, or we have computed the output incorrectly (or both!)
969 let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
970 let idx = outputs.len() - idx.expect("failed to find index") - 1;
971 for (val, diff) in self.update_buffer.drain(..) {
972 self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
973 outputs[idx].1.push((val, next_time.clone(), diff));
974 }
975
976 // Advance times in `self.output_produced` and consolidate the representation.
977 // NOTE: We only do this when we add records; it could be that there are situations
978 // where we want to consolidate even without changes (because an initially
979 // large collection can now be collapsed).
980 if let Some(meet) = meet.as_ref() {
981 for entry in &mut self.output_produced {
982 (entry.0).1 = (entry.0).1.join(meet);
983 }
984 }
985 crate::consolidation::consolidate(&mut self.output_produced);
986 }
987 }
988
989 // Determine synthetic interesting times.
990 //
991 // Synthetic interesting times are produced differently for interesting and uninteresting
992 // times. An uninteresting time must join with an interesting time to become interesting,
993 // which means joins with `self.batch_history` and `self.times_current`. I think we can
994 // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
995 // joined against everything.
996
997 // Any time, even uninteresting times, must be joined with the current accumulation of
998 // batch times as well as the current accumulation of `times_current`.
999 for &((_, ref time), _) in batch_replay.buffer().iter() {
1000 if !time.less_equal(&next_time) {
1001 self.temporary.push(time.join(&next_time));
1002 }
1003 }
1004 for time in self.times_current.iter() {
1005 if !time.less_equal(&next_time) {
1006 self.temporary.push(time.join(&next_time));
1007 }
1008 }
1009
1010 sort_dedup(&mut self.temporary);
1011
1012 // Introduce synthetic times, and re-organize if we add any.
1013 let synth_len = self.synth_times.len();
1014 for time in self.temporary.drain(..) {
1015 // We can either service `join` now, or must delay for the future.
1016 if upper_limit.less_equal(&time) {
1017 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
1018 new_interesting.push(time);
1019 }
1020 else {
1021 self.synth_times.push(time);
1022 }
1023 }
1024 if self.synth_times.len() > synth_len {
1025 self.synth_times.sort_by(|x,y| y.cmp(x));
1026 self.synth_times.dedup();
1027 }
1028 }
1029 else if interesting {
1030 // We cannot process `next_time` now, and must delay it.
1031 //
1032 // I think we are probably only here because of an uninteresting time declared interesting,
1033 // as initial interesting times are filtered to be in interval, and synthetic times are also
1034 // filtered before introducing them to `self.synth_times`.
1035 new_interesting.push(next_time.clone());
1036 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
1037 }
1038
1039
1040 // Update `meet` to track the meet of each source of times.
1041 meet = None;//T::maximum();
1042 update_meet(&mut meet, batch_replay.meet());
1043 update_meet(&mut meet, input_replay.meet());
1044 update_meet(&mut meet, output_replay.meet());
1045 for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
1046 // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
1047 // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
1048 // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
1049 // for time in self.synth_times.iter() { meet = meet.meet(time); }
1050 update_meet(&mut meet, meets_slice.first());
1051 // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
1052
1053 // Update `times_current` by the frontier.
1054 if let Some(meet) = meet.as_ref() {
1055 for time in self.times_current.iter_mut() {
1056 *time = time.join(meet);
1057 }
1058 }
1059
1060 sort_dedup(&mut self.times_current);
1061 }
1062
1063 // Normalize the representation of `new_interesting`, deduplicating and ordering.
1064 sort_dedup(new_interesting);
1065
1066 (compute_counter, output_counter)
1067 }
1068 }
1069
1070 /// Updates an optional meet by an optional time.
1071 fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
1072 if let Some(time) = other {
1073 if let Some(meet) = meet.as_mut() {
1074 *meet = meet.meet(time);
1075 }
1076 if meet.is_none() {
1077 *meet = Some(time.clone());
1078 }
1079 }
1080 }
1081}