differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Container;
14use timely::Data;
15use timely::progress::Timestamp;
16use timely::order::Product;
17use timely::dataflow::scopes::{Child, child::Iterative};
18use timely::dataflow::Scope;
19use timely::dataflow::operators::*;
20use timely::dataflow::StreamCore;
21
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::hashable::Hashable;
25
26/// A mutable collection of values of type `D`
27///
28/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
29/// differential dataflow computation, you write as if the collection is a static dataset to which you
30/// apply functional transformations, creating new collections. Once your computation is written, you
31/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
32/// propagate changes through your functional computation and report the corresponding changes to the
33/// output collections.
34///
35/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
36/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
37/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
38/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
39/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
40/// defaults to) `isize`, representing changes to the occurrence count of each record.
41#[derive(Clone)]
42pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
43    /// The underlying timely dataflow stream.
44    ///
45    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
46    /// not intended to be the idiomatic way to work with the collection.
47    pub inner: timely::dataflow::StreamCore<G, C>,
48    /// Phantom data for unreferenced `D` and `R` types.
49    phantom: std::marker::PhantomData<(D, R)>,
50}
51
52impl<G: Scope, D, R, C> Collection<G, D, R, C> {
53    /// Creates a new Collection from a timely dataflow stream.
54    ///
55    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
56    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
57    /// provides a `new_collection` method which will create a new collection for you without exposing
58    /// the underlying timely stream at all.
59    pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
60        Collection { inner: stream, phantom: std::marker::PhantomData }
61    }
62}
63impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
64    /// Creates a new collection accumulating the contents of the two collections.
65    ///
66    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
67    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
68    /// two collections.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use differential_dataflow::input::Input;
74    ///
75    /// ::timely::example(|scope| {
76    ///
77    ///     let data = scope.new_collection_from(1 .. 10).1;
78    ///
79    ///     let odds = data.filter(|x| x % 2 == 1);
80    ///     let evens = data.filter(|x| x % 2 == 0);
81    ///
82    ///     odds.concat(&evens)
83    ///         .assert_eq(&data);
84    /// });
85    /// ```
86    pub fn concat(&self, other: &Self) -> Self {
87        self.inner
88            .concat(&other.inner)
89            .as_collection()
90    }
91    /// Creates a new collection accumulating the contents of the two collections.
92    ///
93    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
94    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
95    /// two collections.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use differential_dataflow::input::Input;
101    ///
102    /// ::timely::example(|scope| {
103    ///
104    ///     let data = scope.new_collection_from(1 .. 10).1;
105    ///
106    ///     let odds = data.filter(|x| x % 2 == 1);
107    ///     let evens = data.filter(|x| x % 2 == 0);
108    ///
109    ///     odds.concatenate(Some(evens))
110    ///         .assert_eq(&data);
111    /// });
112    /// ```
113    pub fn concatenate<I>(&self, sources: I) -> Self
114    where
115        I: IntoIterator<Item=Self>
116    {
117        self.inner
118            .concatenate(sources.into_iter().map(|x| x.inner))
119            .as_collection()
120    }
121    // Brings a Collection into a nested region.
122    ///
123    /// This method is a specialization of `enter` to the case where the nested scope is a region.
124    /// It removes the need for an operator that adjusts the timestamp.
125    pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
126        self.inner
127            .enter(child)
128            .as_collection()
129    }
130    /// Applies a supplied function to each batch of updates.
131    ///
132    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
133    /// timely dataflow capability associated with the batch of updates. The observed batching depends
134    /// on how the system executes, and may vary run to run.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use differential_dataflow::input::Input;
140    ///
141    /// ::timely::example(|scope| {
142    ///     scope.new_collection_from(1 .. 10).1
143    ///          .map_in_place(|x| *x *= 2)
144    ///          .filter(|x| x % 2 == 1)
145    ///          .inspect_container(|event| println!("event: {:?}", event));
146    /// });
147    /// ```
148    pub fn inspect_container<F>(&self, func: F) -> Self
149    where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static {
150        self.inner
151            .inspect_container(func)
152            .as_collection()
153    }
154    /// Attaches a timely dataflow probe to the output of a Collection.
155    ///
156    /// This probe is used to determine when the state of the Collection has stabilized and can
157    /// be read out.
158    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
159        self.inner
160            .probe()
161    }
162    /// Attaches a timely dataflow probe to the output of a Collection.
163    ///
164    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
165    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
166    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
167    /// avoid swamping the system.
168    pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
169        Self::new(self.inner.probe_with(handle))
170    }
171    /// The scope containing the underlying timely dataflow stream.
172    pub fn scope(&self) -> G {
173        self.inner.scope()
174    }
175
176    /// Creates a new collection whose counts are the negation of those in the input.
177    ///
178    /// This method is most commonly used with `concat` to get those element in one collection but not another.
179    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
180    /// including negative counts.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use differential_dataflow::input::Input;
186    ///
187    /// ::timely::example(|scope| {
188    ///
189    ///     let data = scope.new_collection_from(1 .. 10).1;
190    ///
191    ///     let odds = data.filter(|x| x % 2 == 1);
192    ///     let evens = data.filter(|x| x % 2 == 0);
193    ///
194    ///     odds.negate()
195    ///         .concat(&data)
196    ///         .assert_eq(&evens);
197    /// });
198    /// ```
199    // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
200    //       an inherent method on `Collection`.
201    pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<G, C> {
202        crate::operators::Negate::negate(&self.inner).as_collection()
203    }
204}
205
206impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
207    /// Creates a new collection by applying the supplied function to each input element.
208    ///
209    /// # Examples
210    ///
211    /// ```
212    /// use differential_dataflow::input::Input;
213    ///
214    /// ::timely::example(|scope| {
215    ///     scope.new_collection_from(1 .. 10).1
216    ///          .map(|x| x * 2)
217    ///          .filter(|x| x % 2 == 1)
218    ///          .assert_empty();
219    /// });
220    /// ```
221    pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
222    where D2: Data,
223          L: FnMut(D) -> D2 + 'static
224    {
225        self.inner
226            .map(move |(data, time, delta)| (logic(data), time, delta))
227            .as_collection()
228    }
229    /// Creates a new collection by applying the supplied function to each input element.
230    ///
231    /// Although the name suggests in-place mutation, this function does not change the source collection,
232    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
233    /// equivalent to `map`, but can be more efficient.
234    ///
235    /// # Examples
236    ///
237    /// ```
238    /// use differential_dataflow::input::Input;
239    ///
240    /// ::timely::example(|scope| {
241    ///     scope.new_collection_from(1 .. 10).1
242    ///          .map_in_place(|x| *x *= 2)
243    ///          .filter(|x| x % 2 == 1)
244    ///          .assert_empty();
245    /// });
246    /// ```
247    pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
248    where L: FnMut(&mut D) + 'static {
249        self.inner
250            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
251            .as_collection()
252    }
253    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
254    ///
255    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
256    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
257    /// attempting to consolidate the results.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// use differential_dataflow::input::Input;
263    ///
264    /// ::timely::example(|scope| {
265    ///     scope.new_collection_from(1 .. 10).1
266    ///          .flat_map(|x| 0 .. x);
267    /// });
268    /// ```
269    pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
270        where G::Timestamp: Clone,
271              I: IntoIterator,
272              I::Item: Data,
273              L: FnMut(D) -> I + 'static {
274        self.inner
275            .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
276            .as_collection()
277    }
278    /// Creates a new collection containing those input records satisfying the supplied predicate.
279    ///
280    /// # Examples
281    ///
282    /// ```
283    /// use differential_dataflow::input::Input;
284    ///
285    /// ::timely::example(|scope| {
286    ///     scope.new_collection_from(1 .. 10).1
287    ///          .map(|x| x * 2)
288    ///          .filter(|x| x % 2 == 1)
289    ///          .assert_empty();
290    /// });
291    /// ```
292    pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
293    where L: FnMut(&D) -> bool + 'static {
294        self.inner
295            .filter(move |(data, _, _)| logic(data))
296            .as_collection()
297    }
298    /// Replaces each record with another, with a new difference type.
299    ///
300    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
301    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use differential_dataflow::input::Input;
307    ///
308    /// ::timely::example(|scope| {
309    ///
310    ///     let nums = scope.new_collection_from(0 .. 10).1;
311    ///     let x1 = nums.flat_map(|x| 0 .. x);
312    ///     let x2 = nums.map(|x| (x, 9 - x))
313    ///                  .explode(|(x,y)| Some((x,y)));
314    ///
315    ///     x1.assert_eq(&x2);
316    /// });
317    /// ```
318    pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
319    where D2: Data,
320          R2: Semigroup+Multiply<R>,
321          <R2 as Multiply<R>>::Output: Semigroup+'static,
322          I: IntoIterator<Item=(D2,R2)>,
323          L: FnMut(D)->I+'static,
324    {
325        self.inner
326            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
327            .as_collection()
328    }
329
330    /// Joins each record against a collection defined by the function `logic`.
331    ///
332    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
333    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
334    /// modifications made to the results, namely joining timestamps and multiplying differences.
335    ///
336    /// #Examples
337    ///
338    /// ```
339    /// use differential_dataflow::input::Input;
340    ///
341    /// ::timely::example(|scope| {
342    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
343    ///     // for x from 0 through 9.
344    ///     scope.new_collection_from(0 .. 10isize).1
345    ///          .join_function(|x|
346    ///              //   data      time      diff
347    ///              vec![(2*x, (3*x) as u64,  x),
348    ///                   (2*x, (4*x) as u64, -x)]
349    ///           );
350    /// });
351    /// ```
352    pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
353    where G::Timestamp: Lattice,
354          D2: Data,
355          R2: Semigroup+Multiply<R>,
356          <R2 as Multiply<R>>::Output: Semigroup+'static,
357          I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
358          L: FnMut(D)->I+'static,
359    {
360        self.inner
361            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
362            .as_collection()
363    }
364
365    /// Brings a Collection into a nested scope.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// use timely::dataflow::Scope;
371    /// use differential_dataflow::input::Input;
372    ///
373    /// ::timely::example(|scope| {
374    ///
375    ///     let data = scope.new_collection_from(1 .. 10).1;
376    ///
377    ///     let result = scope.region(|child| {
378    ///         data.enter(child)
379    ///             .leave()
380    ///     });
381    ///
382    ///     data.assert_eq(&result);
383    /// });
384    /// ```
385    pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
386    where
387        T: Refines<<G as ScopeParent>::Timestamp>,
388    {
389        self.inner
390            .enter(child)
391            .map(|(data, time, diff)| (data, T::to_inner(time), diff))
392            .as_collection()
393    }
394
395    /// Brings a Collection into a nested scope, at varying times.
396    ///
397    /// The `initial` function indicates the time at which each element of the Collection should appear.
398    ///
399    /// # Examples
400    ///
401    /// ```
402    /// use timely::dataflow::Scope;
403    /// use differential_dataflow::input::Input;
404    ///
405    /// ::timely::example(|scope| {
406    ///
407    ///     let data = scope.new_collection_from(1 .. 10).1;
408    ///
409    ///     let result = scope.iterative::<u64,_,_>(|child| {
410    ///         data.enter_at(child, |x| *x)
411    ///             .leave()
412    ///     });
413    ///
414    ///     data.assert_eq(&result);
415    /// });
416    /// ```
417    pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
418    where
419        T: Timestamp+Hash,
420        F: FnMut(&D) -> T + Clone + 'static,
421        G::Timestamp: Hash,
422    {
423        self.inner
424            .enter(child)
425            .map(move |(data, time, diff)| {
426                let new_time = Product::new(time, initial(&data));
427                (data, new_time, diff)
428            })
429            .as_collection()
430    }
431
432    /// Delays each difference by a supplied function.
433    ///
434    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
435    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
436    /// ordered, they should have the same order once `func` is applied to them (this is because we advance the
437    /// timely capability with the same logic, and it must remain `less_equal` to all of the data timestamps).
438    pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
439    where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static {
440
441        let mut func1 = func.clone();
442        let mut func2 = func.clone();
443
444        self.inner
445            .delay_batch(move |x| func1(x))
446            .map_in_place(move |x| x.1 = func2(&x.1))
447            .as_collection()
448    }
449    /// Applies a supplied function to each update.
450    ///
451    /// This method is most commonly used to report information back to the user, often for debugging purposes.
452    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
453    /// not guarantee that it will be called as many times as you might expect.
454    ///
455    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
456    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
457    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
458    /// interesting and less intuitive, unfortunately.
459    ///
460    /// # Examples
461    ///
462    /// ```
463    /// use differential_dataflow::input::Input;
464    ///
465    /// ::timely::example(|scope| {
466    ///     scope.new_collection_from(1 .. 10).1
467    ///          .map_in_place(|x| *x *= 2)
468    ///          .filter(|x| x % 2 == 1)
469    ///          .inspect(|x| println!("error: {:?}", x));
470    /// });
471    /// ```
472    pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
473    where F: FnMut(&(D, G::Timestamp, R))+'static {
474        self.inner
475            .inspect(func)
476            .as_collection()
477    }
478    /// Applies a supplied function to each batch of updates.
479    ///
480    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
481    /// timely dataflow capability associated with the batch of updates. The observed batching depends
482    /// on how the system executes, and may vary run to run.
483    ///
484    /// # Examples
485    ///
486    /// ```
487    /// use differential_dataflow::input::Input;
488    ///
489    /// ::timely::example(|scope| {
490    ///     scope.new_collection_from(1 .. 10).1
491    ///          .map_in_place(|x| *x *= 2)
492    ///          .filter(|x| x % 2 == 1)
493    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
494    /// });
495    /// ```
496    pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
497    where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
498        self.inner
499            .inspect_batch(move |time, data| func(time, data))
500            .as_collection()
501    }
502
503    /// Assert if the collection is ever non-empty.
504    ///
505    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
506    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
507    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
508    /// program should indicate that this assertion never found cause to complain.
509    ///
510    /// # Examples
511    ///
512    /// ```
513    /// use differential_dataflow::input::Input;
514    ///
515    /// ::timely::example(|scope| {
516    ///     scope.new_collection_from(1 .. 10).1
517    ///          .map(|x| x * 2)
518    ///          .filter(|x| x % 2 == 1)
519    ///          .assert_empty();
520    /// });
521    /// ```
522    pub fn assert_empty(&self)
523    where D: crate::ExchangeData+Hashable,
524          R: crate::ExchangeData+Hashable + Semigroup,
525          G::Timestamp: Lattice+Ord,
526    {
527        self.consolidate()
528            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
529    }
530}
531
532use timely::dataflow::scopes::ScopeParent;
533use timely::progress::timestamp::Refines;
534
535/// Methods requiring a nested scope.
536impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
537where
538    T: Refines<<G as ScopeParent>::Timestamp>,
539{
540    /// Returns the final value of a Collection from a nested scope to its containing scope.
541    ///
542    /// # Examples
543    ///
544    /// ```
545    /// use timely::dataflow::Scope;
546    /// use differential_dataflow::input::Input;
547    ///
548    /// ::timely::example(|scope| {
549    ///
550    ///    let data = scope.new_collection_from(1 .. 10).1;
551    ///
552    ///    let result = scope.region(|child| {
553    ///         data.enter(child)
554    ///             .leave()
555    ///     });
556    ///
557    ///     data.assert_eq(&result);
558    /// });
559    /// ```
560    pub fn leave(&self) -> Collection<G, D, R> {
561        self.inner
562            .leave()
563            .map(|(data, time, diff)| (data, time.to_outer(), diff))
564            .as_collection()
565    }
566}
567
568/// Methods requiring a region as the scope.
569impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
570{
571    /// Returns the value of a Collection from a nested region to its containing scope.
572    ///
573    /// This method is a specialization of `leave` to the case that of a nested region.
574    /// It removes the need for an operator that adjusts the timestamp.
575    pub fn leave_region(&self) -> Collection<G, D, R, C> {
576        self.inner
577            .leave()
578            .as_collection()
579    }
580}
581
582/// Methods requiring an Abelian difference, to support negation.
583impl<G: Scope, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> where G::Timestamp: Data {
584    /// Assert if the collections are ever different.
585    ///
586    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
587    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
588    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
589    /// indicate that this assertion never found cause to complain.
590    ///
591    /// # Examples
592    ///
593    /// ```
594    /// use differential_dataflow::input::Input;
595    ///
596    /// ::timely::example(|scope| {
597    ///
598    ///     let data = scope.new_collection_from(1 .. 10).1;
599    ///
600    ///     let odds = data.filter(|x| x % 2 == 1);
601    ///     let evens = data.filter(|x| x % 2 == 0);
602    ///
603    ///     odds.concat(&evens)
604    ///         .assert_eq(&data);
605    /// });
606    /// ```
607    pub fn assert_eq(&self, other: &Self)
608    where D: crate::ExchangeData+Hashable,
609          R: crate::ExchangeData+Hashable,
610          G::Timestamp: Lattice+Ord
611    {
612        self.negate()
613            .concat(other)
614            .assert_empty();
615    }
616}
617
618/// Conversion to a differential dataflow Collection.
619pub trait AsCollection<G: Scope, D, R, C> {
620    /// Converts the type to a differential dataflow collection.
621    fn as_collection(&self) -> Collection<G, D, R, C>;
622}
623
624impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
625    fn as_collection(&self) -> Collection<G, D, R, C> {
626        Collection::<G,D,R,C>::new(self.clone())
627    }
628}
629
630/// Concatenates multiple collections.
631///
632/// This method has the effect of a sequence of calls to `concat`, but it does
633/// so in one operator rather than a chain of many operators.
634///
635/// # Examples
636///
637/// ```
638/// use differential_dataflow::input::Input;
639///
640/// ::timely::example(|scope| {
641///
642///     let data = scope.new_collection_from(1 .. 10).1;
643///
644///     let odds = data.filter(|x| x % 2 == 1);
645///     let evens = data.filter(|x| x % 2 == 0);
646///
647///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
648///         .assert_eq(&data);
649/// });
650/// ```
651pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
652where
653    G: Scope,
654    D: Data,
655    R: Semigroup+'static,
656    C: Container + Clone + 'static,
657    I: IntoIterator<Item=Collection<G, D, R, C>>,
658{
659    scope
660        .concatenate(iterator.into_iter().map(|x| x.inner))
661        .as_collection()
662}