differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Container;
14use timely::Data;
15use timely::progress::Timestamp;
16use timely::order::Product;
17use timely::dataflow::scopes::{Child, child::Iterative};
18use timely::dataflow::Scope;
19use timely::dataflow::operators::*;
20use timely::dataflow::StreamCore;
21
22use crate::difference::{Semigroup, Abelian, Multiply};
23use crate::lattice::Lattice;
24use crate::hashable::Hashable;
25
26/// A mutable collection of values of type `D`
27///
28/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
29/// differential dataflow computation, you write as if the collection is a static dataset to which you
30/// apply functional transformations, creating new collections. Once your computation is written, you
31/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
32/// propagate changes through your functional computation and report the corresponding changes to the
33/// output collections.
34///
35/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
36/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
37/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
38/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
39/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
40/// defaults to) `isize`, representing changes to the occurrence count of each record.
41#[derive(Clone)]
42pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
43    /// The underlying timely dataflow stream.
44    ///
45    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
46    /// not intended to be the idiomatic way to work with the collection.
47    ///
48    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
49    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
50    /// unexpectedly.
51    pub inner: timely::dataflow::StreamCore<G, C>,
52    /// Phantom data for unreferenced `D` and `R` types.
53    phantom: std::marker::PhantomData<(D, R)>,
54}
55
56impl<G: Scope, D, R, C> Collection<G, D, R, C> {
57    /// Creates a new Collection from a timely dataflow stream.
58    ///
59    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
60    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
61    /// provides a `new_collection` method which will create a new collection for you without exposing
62    /// the underlying timely stream at all.
63    ///
64    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
65    /// method does not check it.
66    pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
67        Collection { inner: stream, phantom: std::marker::PhantomData }
68    }
69}
70impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
71    /// Creates a new collection accumulating the contents of the two collections.
72    ///
73    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
74    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
75    /// two collections.
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// use differential_dataflow::input::Input;
81    ///
82    /// ::timely::example(|scope| {
83    ///
84    ///     let data = scope.new_collection_from(1 .. 10).1;
85    ///
86    ///     let odds = data.filter(|x| x % 2 == 1);
87    ///     let evens = data.filter(|x| x % 2 == 0);
88    ///
89    ///     odds.concat(&evens)
90    ///         .assert_eq(&data);
91    /// });
92    /// ```
93    pub fn concat(&self, other: &Self) -> Self {
94        self.inner
95            .concat(&other.inner)
96            .as_collection()
97    }
98    /// Creates a new collection accumulating the contents of the two collections.
99    ///
100    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
101    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
102    /// two collections.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use differential_dataflow::input::Input;
108    ///
109    /// ::timely::example(|scope| {
110    ///
111    ///     let data = scope.new_collection_from(1 .. 10).1;
112    ///
113    ///     let odds = data.filter(|x| x % 2 == 1);
114    ///     let evens = data.filter(|x| x % 2 == 0);
115    ///
116    ///     odds.concatenate(Some(evens))
117    ///         .assert_eq(&data);
118    /// });
119    /// ```
120    pub fn concatenate<I>(&self, sources: I) -> Self
121    where
122        I: IntoIterator<Item=Self>
123    {
124        self.inner
125            .concatenate(sources.into_iter().map(|x| x.inner))
126            .as_collection()
127    }
128    // Brings a Collection into a nested region.
129    ///
130    /// This method is a specialization of `enter` to the case where the nested scope is a region.
131    /// It removes the need for an operator that adjusts the timestamp.
132    pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
133        self.inner
134            .enter(child)
135            .as_collection()
136    }
137    /// Applies a supplied function to each batch of updates.
138    ///
139    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
140    /// timely dataflow capability associated with the batch of updates. The observed batching depends
141    /// on how the system executes, and may vary run to run.
142    ///
143    /// # Examples
144    ///
145    /// ```
146    /// use differential_dataflow::input::Input;
147    ///
148    /// ::timely::example(|scope| {
149    ///     scope.new_collection_from(1 .. 10).1
150    ///          .map_in_place(|x| *x *= 2)
151    ///          .filter(|x| x % 2 == 1)
152    ///          .inspect_container(|event| println!("event: {:?}", event));
153    /// });
154    /// ```
155    pub fn inspect_container<F>(&self, func: F) -> Self
156    where
157        F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
158    {
159        self.inner
160            .inspect_container(func)
161            .as_collection()
162    }
163    /// Attaches a timely dataflow probe to the output of a Collection.
164    ///
165    /// This probe is used to determine when the state of the Collection has stabilized and can
166    /// be read out.
167    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
168        self.inner
169            .probe()
170    }
171    /// Attaches a timely dataflow probe to the output of a Collection.
172    ///
173    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
174    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
175    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
176    /// avoid swamping the system.
177    pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
178        Self::new(self.inner.probe_with(handle))
179    }
180    /// The scope containing the underlying timely dataflow stream.
181    pub fn scope(&self) -> G {
182        self.inner.scope()
183    }
184
185    /// Creates a new collection whose counts are the negation of those in the input.
186    ///
187    /// This method is most commonly used with `concat` to get those element in one collection but not another.
188    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
189    /// including negative counts.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// use differential_dataflow::input::Input;
195    ///
196    /// ::timely::example(|scope| {
197    ///
198    ///     let data = scope.new_collection_from(1 .. 10).1;
199    ///
200    ///     let odds = data.filter(|x| x % 2 == 1);
201    ///     let evens = data.filter(|x| x % 2 == 0);
202    ///
203    ///     odds.negate()
204    ///         .concat(&data)
205    ///         .assert_eq(&evens);
206    /// });
207    /// ```
208    // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
209    //       an inherent method on `Collection`.
210    pub fn negate(&self) -> Collection<G, D, R, C>
211    where 
212        StreamCore<G, C>: crate::operators::Negate<G, C>
213    {
214        crate::operators::Negate::negate(&self.inner).as_collection()
215    }
216}
217
218impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
219    /// Creates a new collection by applying the supplied function to each input element.
220    ///
221    /// # Examples
222    ///
223    /// ```
224    /// use differential_dataflow::input::Input;
225    ///
226    /// ::timely::example(|scope| {
227    ///     scope.new_collection_from(1 .. 10).1
228    ///          .map(|x| x * 2)
229    ///          .filter(|x| x % 2 == 1)
230    ///          .assert_empty();
231    /// });
232    /// ```
233    pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
234    where
235        D2: Data,
236        L: FnMut(D) -> D2 + 'static,
237    {
238        self.inner
239            .map(move |(data, time, delta)| (logic(data), time, delta))
240            .as_collection()
241    }
242    /// Creates a new collection by applying the supplied function to each input element.
243    ///
244    /// Although the name suggests in-place mutation, this function does not change the source collection,
245    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
246    /// equivalent to `map`, but can be more efficient.
247    ///
248    /// # Examples
249    ///
250    /// ```
251    /// use differential_dataflow::input::Input;
252    ///
253    /// ::timely::example(|scope| {
254    ///     scope.new_collection_from(1 .. 10).1
255    ///          .map_in_place(|x| *x *= 2)
256    ///          .filter(|x| x % 2 == 1)
257    ///          .assert_empty();
258    /// });
259    /// ```
260    pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
261    where
262        L: FnMut(&mut D) + 'static,
263    {
264        self.inner
265            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
266            .as_collection()
267    }
268    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
269    ///
270    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
271    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
272    /// attempting to consolidate the results.
273    ///
274    /// # Examples
275    ///
276    /// ```
277    /// use differential_dataflow::input::Input;
278    ///
279    /// ::timely::example(|scope| {
280    ///     scope.new_collection_from(1 .. 10).1
281    ///          .flat_map(|x| 0 .. x);
282    /// });
283    /// ```
284    pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
285    where
286        G::Timestamp: Clone,
287        I: IntoIterator<Item: Data>,
288        L: FnMut(D) -> I + 'static,
289    {
290        self.inner
291            .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
292            .as_collection()
293    }
294    /// Creates a new collection containing those input records satisfying the supplied predicate.
295    ///
296    /// # Examples
297    ///
298    /// ```
299    /// use differential_dataflow::input::Input;
300    ///
301    /// ::timely::example(|scope| {
302    ///     scope.new_collection_from(1 .. 10).1
303    ///          .map(|x| x * 2)
304    ///          .filter(|x| x % 2 == 1)
305    ///          .assert_empty();
306    /// });
307    /// ```
308    pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
309    where
310        L: FnMut(&D) -> bool + 'static,
311    {
312        self.inner
313            .filter(move |(data, _, _)| logic(data))
314            .as_collection()
315    }
316    /// Replaces each record with another, with a new difference type.
317    ///
318    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
319    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use differential_dataflow::input::Input;
325    ///
326    /// ::timely::example(|scope| {
327    ///
328    ///     let nums = scope.new_collection_from(0 .. 10).1;
329    ///     let x1 = nums.flat_map(|x| 0 .. x);
330    ///     let x2 = nums.map(|x| (x, 9 - x))
331    ///                  .explode(|(x,y)| Some((x,y)));
332    ///
333    ///     x1.assert_eq(&x2);
334    /// });
335    /// ```
336    pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
337    where
338        D2: Data,
339        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
340        I: IntoIterator<Item=(D2,R2)>,
341        L: FnMut(D)->I+'static,
342    {
343        self.inner
344            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
345            .as_collection()
346    }
347
348    /// Joins each record against a collection defined by the function `logic`.
349    ///
350    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
351    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
352    /// modifications made to the results, namely joining timestamps and multiplying differences.
353    ///
354    /// #Examples
355    ///
356    /// ```
357    /// use differential_dataflow::input::Input;
358    ///
359    /// ::timely::example(|scope| {
360    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
361    ///     // for x from 0 through 9.
362    ///     scope.new_collection_from(0 .. 10isize).1
363    ///          .join_function(|x|
364    ///              //   data      time      diff
365    ///              vec![(2*x, (3*x) as u64,  x),
366    ///                   (2*x, (4*x) as u64, -x)]
367    ///           );
368    /// });
369    /// ```
370    pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
371    where
372        G::Timestamp: Lattice,
373        D2: Data,
374        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
375        I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
376        L: FnMut(D)->I+'static,
377    {
378        self.inner
379            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
380            .as_collection()
381    }
382
383    /// Brings a Collection into a nested scope.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use timely::dataflow::Scope;
389    /// use differential_dataflow::input::Input;
390    ///
391    /// ::timely::example(|scope| {
392    ///
393    ///     let data = scope.new_collection_from(1 .. 10).1;
394    ///
395    ///     let result = scope.region(|child| {
396    ///         data.enter(child)
397    ///             .leave()
398    ///     });
399    ///
400    ///     data.assert_eq(&result);
401    /// });
402    /// ```
403    pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
404    where
405        T: Refines<<G as ScopeParent>::Timestamp>,
406    {
407        self.inner
408            .enter(child)
409            .map(|(data, time, diff)| (data, T::to_inner(time), diff))
410            .as_collection()
411    }
412
413    /// Brings a Collection into a nested scope, at varying times.
414    ///
415    /// The `initial` function indicates the time at which each element of the Collection should appear.
416    ///
417    /// # Examples
418    ///
419    /// ```
420    /// use timely::dataflow::Scope;
421    /// use differential_dataflow::input::Input;
422    ///
423    /// ::timely::example(|scope| {
424    ///
425    ///     let data = scope.new_collection_from(1 .. 10).1;
426    ///
427    ///     let result = scope.iterative::<u64,_,_>(|child| {
428    ///         data.enter_at(child, |x| *x)
429    ///             .leave()
430    ///     });
431    ///
432    ///     data.assert_eq(&result);
433    /// });
434    /// ```
435    pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
436    where
437        T: Timestamp+Hash,
438        F: FnMut(&D) -> T + Clone + 'static,
439        G::Timestamp: Hash,
440    {
441        self.inner
442            .enter(child)
443            .map(move |(data, time, diff)| {
444                let new_time = Product::new(time, initial(&data));
445                (data, new_time, diff)
446            })
447            .as_collection()
448    }
449
450    /// Delays each difference by a supplied function.
451    ///
452    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
453    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
454    /// ordered, they should have the same order or compare equal once `func` is applied to them (this
455    /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
456    /// to all of the data timestamps).
457    pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
458    where
459        F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
460    {
461        let mut func1 = func.clone();
462        let mut func2 = func.clone();
463
464        self.inner
465            .delay_batch(move |x| func1(x))
466            .map_in_place(move |x| x.1 = func2(&x.1))
467            .as_collection()
468    }
469
470    /// Applies a supplied function to each update.
471    ///
472    /// This method is most commonly used to report information back to the user, often for debugging purposes.
473    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
474    /// not guarantee that it will be called as many times as you might expect.
475    ///
476    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
477    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
478    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
479    /// interesting and less intuitive, unfortunately.
480    ///
481    /// # Examples
482    ///
483    /// ```
484    /// use differential_dataflow::input::Input;
485    ///
486    /// ::timely::example(|scope| {
487    ///     scope.new_collection_from(1 .. 10).1
488    ///          .map_in_place(|x| *x *= 2)
489    ///          .filter(|x| x % 2 == 1)
490    ///          .inspect(|x| println!("error: {:?}", x));
491    /// });
492    /// ```
493    pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
494    where
495        F: FnMut(&(D, G::Timestamp, R))+'static,
496    {
497        self.inner
498            .inspect(func)
499            .as_collection()
500    }
501    /// Applies a supplied function to each batch of updates.
502    ///
503    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
504    /// timely dataflow capability associated with the batch of updates. The observed batching depends
505    /// on how the system executes, and may vary run to run.
506    ///
507    /// # Examples
508    ///
509    /// ```
510    /// use differential_dataflow::input::Input;
511    ///
512    /// ::timely::example(|scope| {
513    ///     scope.new_collection_from(1 .. 10).1
514    ///          .map_in_place(|x| *x *= 2)
515    ///          .filter(|x| x % 2 == 1)
516    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
517    /// });
518    /// ```
519    pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
520    where
521        F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
522    {
523        self.inner
524            .inspect_batch(move |time, data| func(time, data))
525            .as_collection()
526    }
527
528    /// Assert if the collection is ever non-empty.
529    ///
530    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
531    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
532    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
533    /// program should indicate that this assertion never found cause to complain.
534    ///
535    /// # Examples
536    ///
537    /// ```
538    /// use differential_dataflow::input::Input;
539    ///
540    /// ::timely::example(|scope| {
541    ///     scope.new_collection_from(1 .. 10).1
542    ///          .map(|x| x * 2)
543    ///          .filter(|x| x % 2 == 1)
544    ///          .assert_empty();
545    /// });
546    /// ```
547    pub fn assert_empty(&self)
548    where
549        D: crate::ExchangeData+Hashable,
550        R: crate::ExchangeData+Hashable + Semigroup,
551        G::Timestamp: Lattice+Ord,
552    {
553        self.consolidate()
554            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
555    }
556}
557
558use timely::dataflow::scopes::ScopeParent;
559use timely::progress::timestamp::Refines;
560
561/// Methods requiring a nested scope.
562impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
563where
564    T: Refines<<G as ScopeParent>::Timestamp>,
565{
566    /// Returns the final value of a Collection from a nested scope to its containing scope.
567    ///
568    /// # Examples
569    ///
570    /// ```
571    /// use timely::dataflow::Scope;
572    /// use differential_dataflow::input::Input;
573    ///
574    /// ::timely::example(|scope| {
575    ///
576    ///    let data = scope.new_collection_from(1 .. 10).1;
577    ///
578    ///    let result = scope.region(|child| {
579    ///         data.enter(child)
580    ///             .leave()
581    ///     });
582    ///
583    ///     data.assert_eq(&result);
584    /// });
585    /// ```
586    pub fn leave(&self) -> Collection<G, D, R> {
587        self.inner
588            .leave()
589            .map(|(data, time, diff)| (data, time.to_outer(), diff))
590            .as_collection()
591    }
592}
593
594/// Methods requiring a region as the scope.
595impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
596{
597    /// Returns the value of a Collection from a nested region to its containing scope.
598    ///
599    /// This method is a specialization of `leave` to the case that of a nested region.
600    /// It removes the need for an operator that adjusts the timestamp.
601    pub fn leave_region(&self) -> Collection<G, D, R, C> {
602        self.inner
603            .leave()
604            .as_collection()
605    }
606}
607
608/// Methods requiring an Abelian difference, to support negation.
609impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
610    /// Assert if the collections are ever different.
611    ///
612    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
613    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
614    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
615    /// indicate that this assertion never found cause to complain.
616    ///
617    /// # Examples
618    ///
619    /// ```
620    /// use differential_dataflow::input::Input;
621    ///
622    /// ::timely::example(|scope| {
623    ///
624    ///     let data = scope.new_collection_from(1 .. 10).1;
625    ///
626    ///     let odds = data.filter(|x| x % 2 == 1);
627    ///     let evens = data.filter(|x| x % 2 == 0);
628    ///
629    ///     odds.concat(&evens)
630    ///         .assert_eq(&data);
631    /// });
632    /// ```
633    pub fn assert_eq(&self, other: &Self)
634    where
635        D: crate::ExchangeData+Hashable,
636        R: crate::ExchangeData+Hashable,
637        G::Timestamp: Lattice+Ord,
638    {
639        self.negate()
640            .concat(other)
641            .assert_empty();
642    }
643}
644
645/// Conversion to a differential dataflow Collection.
646pub trait AsCollection<G: Scope, D, R, C> {
647    /// Converts the type to a differential dataflow collection.
648    fn as_collection(&self) -> Collection<G, D, R, C>;
649}
650
651impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
652    /// Converts the type to a differential dataflow collection.
653    ///
654    /// By calling this method, you guarantee that the timestamp invariant (as documented on
655    /// [Collection]) is upheld. This method will not check it.
656    fn as_collection(&self) -> Collection<G, D, R, C> {
657        Collection::<G,D,R,C>::new(self.clone())
658    }
659}
660
661/// Concatenates multiple collections.
662///
663/// This method has the effect of a sequence of calls to `concat`, but it does
664/// so in one operator rather than a chain of many operators.
665///
666/// # Examples
667///
668/// ```
669/// use differential_dataflow::input::Input;
670///
671/// ::timely::example(|scope| {
672///
673///     let data = scope.new_collection_from(1 .. 10).1;
674///
675///     let odds = data.filter(|x| x % 2 == 1);
676///     let evens = data.filter(|x| x % 2 == 0);
677///
678///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
679///         .assert_eq(&data);
680/// });
681/// ```
682pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
683where
684    G: Scope,
685    D: Data,
686    R: Semigroup + 'static,
687    C: Container + Clone + 'static,
688    I: IntoIterator<Item=Collection<G, D, R, C>>,
689{
690    scope
691        .concatenate(iterator.into_iter().map(|x| x.inner))
692        .as_collection()
693}