differential_dataflow/operators/
iterate.rs

1//! Iterative application of a differential dataflow fragment.
2//!
3//! The `iterate` operator takes as an argument a closure from a differential dataflow collection
4//! to a collection of the same type. The output collection is the result of applying this closure
5//! an unbounded number of times.
6//!
7//! The implementation of `iterate` does not directly apply the closure, but rather establishes an
8//! iterative timely dataflow subcomputation, in which differences circulate until they dissipate
9//! (indicating that the computation has reached fixed point), or until some number of iterations
10//! have passed.
11//!
12//! **Note**: The dataflow assembled by `iterate` does not automatically insert `consolidate` for
13//! you. This means that either (i) you should insert one yourself, (ii) you should be certain that
14//! all paths from the input to the output of the loop involve consolidation, or (iii) you should
15//! be worried that logically cancelable differences may circulate indefinitely.
16//!
17//! # Details
18//!
19//! The `iterate` method is written using a `Variable`, which lets you define your own iterative
20//! computations when `iterate` itself is not sufficient. This can happen when you have two
21//! collections that should evolve simultaneously, or when you would like to rotate your loop and
22//! return an intermediate result.
23//!
24//! Using `Variable` requires more explicit arrangement of your computation, but isn't much more
25//! complicated. You must define a new variable from an existing stream (its initial value), and
26//! then set it to be a function of this variable (and perhaps other collections and variables).
27//!
28//! A `Variable` dereferences to a `Collection`, the one corresponding to its value in each iteration,
29//! and it can be used in most situations where a collection can be used. The act of setting a
30//! `Variable` consumes it and returns the corresponding `Collection`, preventing you from setting
31//! it multiple times.
32
33use std::fmt::Debug;
34use std::ops::Deref;
35
36use timely::Container;
37use timely::progress::{Timestamp, PathSummary};
38use timely::order::Product;
39
40use timely::dataflow::*;
41use timely::dataflow::scopes::child::Iterative;
42use timely::dataflow::operators::{Feedback, ConnectLoop};
43use timely::dataflow::operators::feedback::Handle;
44
45use crate::{Data, Collection, AsCollection};
46use crate::difference::{Semigroup, Abelian};
47use crate::lattice::Lattice;
48
49/// An extension trait for the `iterate` method.
50pub trait Iterate<G: Scope, D: Data, R: Semigroup> {
51    /// Iteratively apply `logic` to the source collection until convergence.
52    ///
53    /// Importantly, this method does not automatically consolidate results.
54    /// It may be important to conclude with `consolidate()` to ensure that
55    /// logically empty collections that contain cancelling records do not
56    /// result in non-termination. Operators like `reduce`, `distinct`, and
57    /// `count` also perform consolidation, and are safe to conclude with.
58    ///
59    /// # Examples
60    ///
61    /// ```
62    /// use differential_dataflow::input::Input;
63    /// use differential_dataflow::operators::Iterate;
64    ///
65    /// ::timely::example(|scope| {
66    ///
67    ///     scope.new_collection_from(1 .. 10u32).1
68    ///          .iterate(|values| {
69    ///              values.map(|x| if x % 2 == 0 { x/2 } else { x })
70    ///                    .consolidate()
71    ///          });
72    /// });
73    /// ```
74    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
75        where
76            G::Timestamp: Lattice,
77            for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>;
78}
79
80impl<G: Scope, D: Ord+Data+Debug, R: Abelian+'static> Iterate<G, D, R> for Collection<G, D, R> {
81    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
82        where G::Timestamp: Lattice,
83              for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
84
85        self.inner.scope().scoped("Iterate", |subgraph| {
86            // create a new variable, apply logic, bind variable, return.
87            //
88            // this could be much more succinct if we returned the collection
89            // wrapped by `variable`, but it also results in substantially more
90            // diffs produced; `result` is post-consolidation, and means fewer
91            // records are yielded out of the loop.
92            let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
93            let result = logic(&variable);
94            variable.set(&result);
95            result.leave()
96        })
97    }
98}
99
100impl<G: Scope, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<G, D, R> for G {
101    fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
102        where G::Timestamp: Lattice,
103              for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
104
105        // TODO: This makes me think we have the wrong ownership pattern here.
106        let mut clone = self.clone();
107        clone
108            .scoped("Iterate", |subgraph| {
109                // create a new variable, apply logic, bind variable, return.
110                //
111                // this could be much more succinct if we returned the collection
112                // wrapped by `variable`, but it also results in substantially more
113                // diffs produced; `result` is post-consolidation, and means fewer
114                // records are yielded out of the loop.
115                let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
116                let result = logic(&variable);
117                variable.set(&result);
118                result.leave()
119            }
120        )
121    }
122}
123
124/// A recursively defined collection.
125///
126/// The `Variable` struct allows differential dataflow programs requiring more sophisticated
127/// iterative patterns than singly recursive iteration. For example: in mutual recursion two
128/// collections evolve simultaneously.
129///
130/// # Examples
131///
132/// The following example is equivalent to the example for the `Iterate` trait.
133///
134/// ```
135/// use timely::order::Product;
136/// use timely::dataflow::Scope;
137///
138/// use differential_dataflow::input::Input;
139/// use differential_dataflow::operators::iterate::Variable;
140///
141/// ::timely::example(|scope| {
142///
143///     let numbers = scope.new_collection_from(1 .. 10u32).1;
144///
145///     scope.iterative::<u64,_,_>(|nested| {
146///         let summary = Product::new(Default::default(), 1);
147///         let variable = Variable::new_from(numbers.enter(nested), summary);
148///         let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
149///                              .consolidate();
150///         variable.set(&result)
151///                 .leave()
152///     });
153/// })
154/// ```
155pub struct Variable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
156where
157    G: Scope,
158    G::Timestamp: Lattice,
159    D: Data,
160    R: Abelian + 'static,
161    C: Container + Clone + 'static,
162{
163    collection: Collection<G, D, R, C>,
164    feedback: Handle<G, C>,
165    source: Option<Collection<G, D, R, C>>,
166    step: <G::Timestamp as Timestamp>::Summary,
167}
168
169impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
170where
171    G::Timestamp: Lattice,
172    StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
173{
174    /// Creates a new initially empty `Variable`.
175    ///
176    /// This method produces a simpler dataflow graph than `new_from`, and should
177    /// be used whenever the variable has an empty input.
178    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
179        let (feedback, updates) = scope.feedback(step.clone());
180        let collection = Collection::<G, D, R, C>::new(updates);
181        Self { collection, feedback, source: None, step }
182    }
183
184    /// Creates a new `Variable` from a supplied `source` stream.
185    pub fn new_from(source: Collection<G, D, R, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
186        let (feedback, updates) = source.inner.scope().feedback(step.clone());
187        let collection = Collection::<G, D, R, C>::new(updates).concat(&source);
188        Variable { collection, feedback, source: Some(source), step }
189    }
190
191    /// Set the definition of the `Variable` to a collection.
192    ///
193    /// This method binds the `Variable` to be equal to the supplied collection,
194    /// which may be recursively defined in terms of the variable itself.
195    pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
196        let mut in_result = result.clone();
197        if let Some(source) = &self.source {
198            in_result = in_result.concat(&source.negate());
199        }
200        self.set_concat(&in_result)
201    }
202
203    /// Set the definition of the `Variable` to a collection concatenated to `self`.
204    ///
205    /// This method is a specialization of `set` which has the effect of concatenating
206    /// `result` and `self` before calling `set`. This method avoids some dataflow
207    /// complexity related to retracting the initial input, and will do less work in
208    /// that case.
209    ///
210    /// This behavior can also be achieved by using `new` to create an empty initial
211    /// collection, and then using `self.set(self.concat(result))`.
212    pub fn set_concat(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
213        let step = self.step;
214        result
215            .inner
216            .results_in(step)
217            .connect_loop(self.feedback);
218
219        self.collection
220    }
221}
222
223impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Deref for Variable<G, D, R, C> where G::Timestamp: Lattice {
224    type Target = Collection<G, D, R, C>;
225    fn deref(&self) -> &Self::Target {
226        &self.collection
227    }
228}
229
230/// A recursively defined collection that only "grows".
231///
232/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
233/// that do not implement `Abelian` and only implement `Semigroup`. This means
234/// that it can be used in settings where the difference type does not support
235/// negation.
236pub struct SemigroupVariable<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
237where
238    G::Timestamp: Lattice,
239    G: Scope,
240    D: Data,
241    R: Semigroup + 'static,
242    C: Container + Clone + 'static,
243{
244    collection: Collection<G, D, R, C>,
245    feedback: Handle<G, C>,
246    step: <G::Timestamp as Timestamp>::Summary,
247}
248
249impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone> SemigroupVariable<G, D, R, C>
250where
251    G::Timestamp: Lattice,
252    StreamCore<G, C>: ResultsIn<G, C>,
253{
254    /// Creates a new initially empty `SemigroupVariable`.
255    pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
256        let (feedback, updates) = scope.feedback(step.clone());
257        let collection = Collection::<G,D,R,C>::new(updates);
258        SemigroupVariable { collection, feedback, step }
259    }
260
261    /// Adds a new source of data to `self`.
262    pub fn set(self, result: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
263        let step = self.step;
264        result
265            .inner
266            .results_in(step)
267            .connect_loop(self.feedback);
268
269        self.collection
270    }
271}
272
273impl<G: Scope, D: Data, R: Semigroup, C: Container+Clone+'static> Deref for SemigroupVariable<G, D, R, C> where G::Timestamp: Lattice {
274    type Target = Collection<G, D, R, C>;
275    fn deref(&self) -> &Self::Target {
276        &self.collection
277    }
278}
279
280/// Extension trait for streams.
281pub trait ResultsIn<G: Scope, C> {
282    /// Advances a timestamp in the stream according to the timestamp actions on the path.
283    ///
284    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
285    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
286    ///
287    /// # Examples
288    /// ```
289    /// use timely::dataflow::Scope;
290    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
291    ///
292    /// use differential_dataflow::input::Input;
293    /// use differential_dataflow::operators::ResultsIn;
294    ///
295    /// timely::example(|scope| {
296    ///     let summary1 = 5;
297    ///
298    ///     let data = scope.new_collection_from(1 .. 10).1;
299    ///     /// Applies `results_in` on every timestamp in the collection.
300    ///     data.results_in(summary1);
301    /// });
302    /// ```
303    fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self;
304}
305
306impl<G, D, R, C> ResultsIn<G, C> for Collection<G, D, R, C>
307where
308    G: Scope,
309    C: Clone,
310    StreamCore<G, C>: ResultsIn<G, C>,
311{
312    fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
313        self.inner.results_in(step).as_collection()
314    }
315}
316
317impl<G: Scope, D: timely::Data, R: timely::Data> ResultsIn<G, Vec<(D, G::Timestamp, R)>> for Stream<G, (D, G::Timestamp, R)> {
318    fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self {
319        use timely::dataflow::operators::Map;
320        self.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
321    }
322}