differential_dataflow/operators/iterate.rs
1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34use std::ops::Deref;
35
36use timely::Container;
37use timely::progress::{Timestamp, PathSummary};
38use timely::order::Product;
39
40use timely::dataflow::*;
41use timely::dataflow::scopes::child::Iterative;
42use timely::dataflow::operators::{Feedback, ConnectLoop};
43use timely::dataflow::operators::feedback::Handle;
44
45use crate::{Data, Collection, AsCollection};
46use crate::difference::{Semigroup, Abelian};
47use crate::lattice::Lattice;
48
49/// An extension trait for the `iterate` method.
50pub trait Iterate<G: Scope<Timestamp: Lattice>, D: Data, R: Semigroup> {
51 /// Iteratively apply `logic` to the source collection until convergence.
52 ///
53 /// Importantly, this method does not automatically consolidate results.
54 /// It may be important to conclude with `consolidate()` to ensure that
55 /// logically empty collections that contain cancelling records do not
56 /// result in non-termination. Operators like `reduce`, `distinct`, and
57 /// `count` also perform consolidation, and are safe to conclude with.
58 ///
59 /// # Examples
60 ///
61 /// ```
62 /// use differential_dataflow::input::Input;
63 /// use differential_dataflow::operators::Iterate;
64 ///
65 /// ::timely::example(|scope| {
66 ///
67 /// scope.new_collection_from(1 .. 10u32).1
68 /// .iterate(|values| {
69 /// values.map(|x| if x % 2 == 0 { x/2 } else { x })
70 /// .consolidate()
71 /// });
72 /// });
73 /// ```
74 fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
75 where
76 for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>;
77}
78
79impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Abelian+'static> Iterate<G, D, R> for Collection<G, D, R> {
80 fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
81 where
82 for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>,
83 {
84 self.inner.scope().scoped("Iterate", |subgraph| {
85 // create a new variable, apply logic, bind variable, return.
86 //
87 // this could be much more succinct if we returned the collection
88 // wrapped by `variable`, but it also results in substantially more
89 // diffs produced; `result` is post-consolidation, and means fewer
90 // records are yielded out of the loop.
91 let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
92 let result = logic(&variable);
93 variable.set(&result);
94 result.leave()
95 })
96 }
97}
98
99impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
100 fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
101 where
102 for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>,
103 {
104 // TODO: This makes me think we have the wrong ownership pattern here.
105 let mut clone = self.clone();
106 clone
107 .scoped("Iterate", |subgraph| {
108 // create a new variable, apply logic, bind variable, return.
109 //
110 // this could be much more succinct if we returned the collection
111 // wrapped by `variable`, but it also results in substantially more
112 // diffs produced; `result` is post-consolidation, and means fewer
113 // records are yielded out of the loop.
114 let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
115 let result = logic(&variable);
116 variable.set(&result);
117 result.leave()
118 }
119 )
120 }
121}
122
123/// A recursively defined collection.
124///
125/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
126/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
127/// collections evolve simultaneously.
128///
129/// # Examples
130///
131/// The following example is equivalent to the example for the `Iterate` trait.
132///
133/// ```
134/// use timely::order::Product;
135/// use timely::dataflow::Scope;
136///
137/// use differential_dataflow::input::Input;
138/// use differential_dataflow::operators::iterate::Variable;
139///
140/// ::timely::example(|scope| {
141///
142/// let numbers = scope.new_collection_from(1 .. 10u32).1;
143///
144/// scope.iterative::<u64,_,_>(|nested| {
145/// let summary = Product::new(Default::default(), 1);
146/// let variable = Variable::new_from(numbers.enter(nested), summary);
147/// let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
148/// .consolidate();
149/// variable.set(&result)
150/// .leave()
151/// });
152/// })
153/// ```
154pub struct Variable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
155where
156 G: Scope<Timestamp: Lattice>,
157 D: Data,
158 R: Abelian + 'static,
159 C: Container + Clone + 'static,
160{
161 collection: Collection<G, D, R, C>,
162 feedback: Handle<G, C>,
163 source: Option<Collection<G, D, R, C>>,
164 step: <G::Timestamp as Timestamp>::Summary,
165}
166
167impl<G, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
168where
169 G: Scope<Timestamp: Lattice>,
170 StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
171{
172 /// Creates a new initially empty `Variable`.
173 ///
174 /// This method produces a simpler dataflow graph than `new_from`, and should
175 /// be used whenever the variable has an empty input.
176 pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
177 let (feedback, updates) = scope.feedback(step.clone());
178 let collection = Collection::<G, D, R, C>::new(updates);
179 Self { collection, feedback, source: None, step }
180 }
181
182 /// Creates a new `Variable` from a supplied `source` stream.
183 pub fn new_from(source: Collection<G, D, R, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
184 let (feedback, updates) = source.inner.scope().feedback(step.clone());
185 let collection = Collection::<G, D, R, C>::new(updates).concat(&source);
186 Variable { collection, feedback, source: Some(source), step }
187 }
188
189 /// Set the definition of the `Variable` to a collection.
190 ///
191 /// This method binds the `Variable` to be equal to the supplied collection,
192 /// which may be recursively defined in terms of the variable itself.
193 pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
194 let mut in_result = result.clone();
195 if let Some(source) = &self.source {
196 in_result = in_result.concat(&source.negate());
197 }
198 self.set_concat(&in_result)
199 }
200
201 /// Set the definition of the `Variable` to a collection concatenated to `self`.
202 ///
203 /// This method is a specialization of `set` which has the effect of concatenating
204 /// `result` and `self` before calling `set`. This method avoids some dataflow
205 /// complexity related to retracting the initial input, and will do less work in
206 /// that case.
207 ///
208 /// This behavior can also be achieved by using `new` to create an empty initial
209 /// collection, and then using `self.set(self.concat(result))`.
210 pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
211 let step = self.step;
212 result
213 .inner
214 .results_in(step)
215 .connect_loop(self.feedback);
216
217 self.collection
218 }
219}
220
221impl<G: Scope<Timestamp: Lattice>, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable<G, D, R, C> {
222 type Target = Collection<G, D, R, C>;
223 fn deref(&self) -> &Self::Target {
224 &self.collection
225 }
226}
227
228/// A recursively defined collection that only "grows".
229///
230/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
231/// that do not implement `Abelian` and only implement `Semigroup`. This means
232/// that it can be used in settings where the difference type does not support
233/// negation.
234pub struct SemigroupVariable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
235where
236 G: Scope<Timestamp: Lattice>,
237 D: Data,
238 R: Semigroup + 'static,
239 C: Container + Clone + 'static,
240{
241 collection: Collection<G, D, R, C>,
242 feedback: Handle<G, C>,
243 step: <G::Timestamp as Timestamp>::Summary,
244}
245
246impl<G, D: Data, R: Semigroup, C: Container+Clone> SemigroupVariable<G, D, R, C>
247where
248 G: Scope<Timestamp: Lattice>,
249 StreamCore<G, C>: ResultsIn<G, C>,
250{
251 /// Creates a new initially empty `SemigroupVariable`.
252 pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
253 let (feedback, updates) = scope.feedback(step.clone());
254 let collection = Collection::<G,D,R,C>::new(updates);
255 SemigroupVariable { collection, feedback, step }
256 }
257
258 /// Adds a new source of data to `self`.
259 pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
260 let step = self.step;
261 result
262 .inner
263 .results_in(step)
264 .connect_loop(self.feedback);
265
266 self.collection
267 }
268}
269
270impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone+'static> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
271 type Target = Collection<G, D, R, C>;
272 fn deref(&self) -> &Self::Target {
273 &self.collection
274 }
275}
276
277/// Extension trait for streams.
278pub trait ResultsIn<G: Scope, C> {
279 /// Advances a timestamp in the stream according to the timestamp actions on the path.
280 ///
281 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
282 /// incrementing fields would result in integer overflow. In this case, the record is dropped.
283 ///
284 /// # Examples
285 /// ```
286 /// use timely::dataflow::Scope;
287 /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
288 ///
289 /// use differential_dataflow::input::Input;
290 /// use differential_dataflow::operators::ResultsIn;
291 ///
292 /// timely::example(|scope| {
293 /// let summary1 = 5;
294 ///
295 /// let data = scope.new_collection_from(1 .. 10).1;
296 /// /// Applies `results_in` on every timestamp in the collection.
297 /// data.results_in(summary1);
298 /// });
299 /// ```
300 fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self;
301}
302
303impl<G, D, R, C> ResultsIn<G, C> for Collection<G, D, R, C>
304where
305 G: Scope,
306 C: Clone,
307 StreamCore<G, C>: ResultsIn<G, C>,
308{
309 fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
310 self.inner.results_in(step).as_collection()
311 }
312}
313
314impl<G: Scope, D: timely::Data, R: timely::Data> ResultsIn<G, Vec<(D, G::Timestamp, R)>> for Stream<G, (D, G::Timestamp, R)> {
315 fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
316 use timely::dataflow::operators::Map;
317 self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
318 }
319}