differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::{Container, Data};
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::Scope;
18use timely::dataflow::operators::*;
19use timely::dataflow::StreamCore;
20
21use crate::difference::{Semigroup, Abelian, Multiply};
22use crate::lattice::Lattice;
23use crate::hashable::Hashable;
24
25/// A mutable collection of values of type `D`
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40#[derive(Clone)]
41pub struct Collection<G: Scope, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>> {
42    /// The underlying timely dataflow stream.
43    ///
44    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
45    /// not intended to be the idiomatic way to work with the collection.
46    ///
47    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
48    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
49    /// unexpectedly.
50    pub inner: timely::dataflow::StreamCore<G, C>,
51    /// Phantom data for unreferenced `D` and `R` types.
52    phantom: std::marker::PhantomData<(D, R)>,
53}
54
55impl<G: Scope, D, R, C> Collection<G, D, R, C> {
56    /// Creates a new Collection from a timely dataflow stream.
57    ///
58    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
59    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
60    /// provides a `new_collection` method which will create a new collection for you without exposing
61    /// the underlying timely stream at all.
62    ///
63    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
64    /// method does not check it.
65    pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
66        Collection { inner: stream, phantom: std::marker::PhantomData }
67    }
68}
69impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
70    /// Creates a new collection accumulating the contents of the two collections.
71    ///
72    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
73    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
74    /// two collections.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// use differential_dataflow::input::Input;
80    ///
81    /// ::timely::example(|scope| {
82    ///
83    ///     let data = scope.new_collection_from(1 .. 10).1;
84    ///
85    ///     let odds = data.filter(|x| x % 2 == 1);
86    ///     let evens = data.filter(|x| x % 2 == 0);
87    ///
88    ///     odds.concat(&evens)
89    ///         .assert_eq(&data);
90    /// });
91    /// ```
92    pub fn concat(&self, other: &Self) -> Self {
93        self.inner
94            .concat(&other.inner)
95            .as_collection()
96    }
97    /// Creates a new collection accumulating the contents of the two collections.
98    ///
99    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
100    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
101    /// two collections.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use differential_dataflow::input::Input;
107    ///
108    /// ::timely::example(|scope| {
109    ///
110    ///     let data = scope.new_collection_from(1 .. 10).1;
111    ///
112    ///     let odds = data.filter(|x| x % 2 == 1);
113    ///     let evens = data.filter(|x| x % 2 == 0);
114    ///
115    ///     odds.concatenate(Some(evens))
116    ///         .assert_eq(&data);
117    /// });
118    /// ```
119    pub fn concatenate<I>(&self, sources: I) -> Self
120    where
121        I: IntoIterator<Item=Self>
122    {
123        self.inner
124            .concatenate(sources.into_iter().map(|x| x.inner))
125            .as_collection()
126    }
127    // Brings a Collection into a nested region.
128    ///
129    /// This method is a specialization of `enter` to the case where the nested scope is a region.
130    /// It removes the need for an operator that adjusts the timestamp.
131    pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C> {
132        self.inner
133            .enter(child)
134            .as_collection()
135    }
136    /// Applies a supplied function to each batch of updates.
137    ///
138    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
139    /// timely dataflow capability associated with the batch of updates. The observed batching depends
140    /// on how the system executes, and may vary run to run.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use differential_dataflow::input::Input;
146    ///
147    /// ::timely::example(|scope| {
148    ///     scope.new_collection_from(1 .. 10).1
149    ///          .map_in_place(|x| *x *= 2)
150    ///          .filter(|x| x % 2 == 1)
151    ///          .inspect_container(|event| println!("event: {:?}", event));
152    /// });
153    /// ```
154    pub fn inspect_container<F>(&self, func: F) -> Self
155    where
156        F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
157    {
158        self.inner
159            .inspect_container(func)
160            .as_collection()
161    }
162    /// Attaches a timely dataflow probe to the output of a Collection.
163    ///
164    /// This probe is used to determine when the state of the Collection has stabilized and can
165    /// be read out.
166    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
167        self.inner
168            .probe()
169    }
170    /// Attaches a timely dataflow probe to the output of a Collection.
171    ///
172    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
173    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
174    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
175    /// avoid swamping the system.
176    pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
177        Self::new(self.inner.probe_with(handle))
178    }
179    /// The scope containing the underlying timely dataflow stream.
180    pub fn scope(&self) -> G {
181        self.inner.scope()
182    }
183
184    /// Creates a new collection whose counts are the negation of those in the input.
185    ///
186    /// This method is most commonly used with `concat` to get those element in one collection but not another.
187    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
188    /// including negative counts.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use differential_dataflow::input::Input;
194    ///
195    /// ::timely::example(|scope| {
196    ///
197    ///     let data = scope.new_collection_from(1 .. 10).1;
198    ///
199    ///     let odds = data.filter(|x| x % 2 == 1);
200    ///     let evens = data.filter(|x| x % 2 == 0);
201    ///
202    ///     odds.negate()
203    ///         .concat(&data)
204    ///         .assert_eq(&evens);
205    /// });
206    /// ```
207    // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
208    //       an inherent method on `Collection`.
209    pub fn negate(&self) -> Collection<G, D, R, C>
210    where 
211        StreamCore<G, C>: crate::operators::Negate<G, C>
212    {
213        crate::operators::Negate::negate(&self.inner).as_collection()
214    }
215}
216
217impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
218    /// Creates a new collection by applying the supplied function to each input element.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use differential_dataflow::input::Input;
224    ///
225    /// ::timely::example(|scope| {
226    ///     scope.new_collection_from(1 .. 10).1
227    ///          .map(|x| x * 2)
228    ///          .filter(|x| x % 2 == 1)
229    ///          .assert_empty();
230    /// });
231    /// ```
232    pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
233    where
234        D2: Data,
235        L: FnMut(D) -> D2 + 'static,
236    {
237        self.inner
238            .map(move |(data, time, delta)| (logic(data), time, delta))
239            .as_collection()
240    }
241    /// Creates a new collection by applying the supplied function to each input element.
242    ///
243    /// Although the name suggests in-place mutation, this function does not change the source collection,
244    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
245    /// equivalent to `map`, but can be more efficient.
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// use differential_dataflow::input::Input;
251    ///
252    /// ::timely::example(|scope| {
253    ///     scope.new_collection_from(1 .. 10).1
254    ///          .map_in_place(|x| *x *= 2)
255    ///          .filter(|x| x % 2 == 1)
256    ///          .assert_empty();
257    /// });
258    /// ```
259    pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
260    where
261        L: FnMut(&mut D) + 'static,
262    {
263        self.inner
264            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
265            .as_collection()
266    }
267    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
268    ///
269    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
270    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
271    /// attempting to consolidate the results.
272    ///
273    /// # Examples
274    ///
275    /// ```
276    /// use differential_dataflow::input::Input;
277    ///
278    /// ::timely::example(|scope| {
279    ///     scope.new_collection_from(1 .. 10).1
280    ///          .flat_map(|x| 0 .. x);
281    /// });
282    /// ```
283    pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
284    where
285        G::Timestamp: Clone,
286        I: IntoIterator<Item: Data>,
287        L: FnMut(D) -> I + 'static,
288    {
289        self.inner
290            .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
291            .as_collection()
292    }
293    /// Creates a new collection containing those input records satisfying the supplied predicate.
294    ///
295    /// # Examples
296    ///
297    /// ```
298    /// use differential_dataflow::input::Input;
299    ///
300    /// ::timely::example(|scope| {
301    ///     scope.new_collection_from(1 .. 10).1
302    ///          .map(|x| x * 2)
303    ///          .filter(|x| x % 2 == 1)
304    ///          .assert_empty();
305    /// });
306    /// ```
307    pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
308    where
309        L: FnMut(&D) -> bool + 'static,
310    {
311        self.inner
312            .filter(move |(data, _, _)| logic(data))
313            .as_collection()
314    }
315    /// Replaces each record with another, with a new difference type.
316    ///
317    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
318    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
319    ///
320    /// # Examples
321    ///
322    /// ```
323    /// use differential_dataflow::input::Input;
324    ///
325    /// ::timely::example(|scope| {
326    ///
327    ///     let nums = scope.new_collection_from(0 .. 10).1;
328    ///     let x1 = nums.flat_map(|x| 0 .. x);
329    ///     let x2 = nums.map(|x| (x, 9 - x))
330    ///                  .explode(|(x,y)| Some((x,y)));
331    ///
332    ///     x1.assert_eq(&x2);
333    /// });
334    /// ```
335    pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
336    where
337        D2: Data,
338        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
339        I: IntoIterator<Item=(D2,R2)>,
340        L: FnMut(D)->I+'static,
341    {
342        self.inner
343            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
344            .as_collection()
345    }
346
347    /// Joins each record against a collection defined by the function `logic`.
348    ///
349    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
350    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
351    /// modifications made to the results, namely joining timestamps and multiplying differences.
352    ///
353    /// #Examples
354    ///
355    /// ```
356    /// use differential_dataflow::input::Input;
357    ///
358    /// ::timely::example(|scope| {
359    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
360    ///     // for x from 0 through 9.
361    ///     scope.new_collection_from(0 .. 10isize).1
362    ///          .join_function(|x|
363    ///              //   data      time      diff
364    ///              vec![(2*x, (3*x) as u64,  x),
365    ///                   (2*x, (4*x) as u64, -x)]
366    ///           );
367    /// });
368    /// ```
369    pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
370    where
371        G::Timestamp: Lattice,
372        D2: Data,
373        R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
374        I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
375        L: FnMut(D)->I+'static,
376    {
377        self.inner
378            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
379            .as_collection()
380    }
381
382    /// Brings a Collection into a nested scope.
383    ///
384    /// # Examples
385    ///
386    /// ```
387    /// use timely::dataflow::Scope;
388    /// use differential_dataflow::input::Input;
389    ///
390    /// ::timely::example(|scope| {
391    ///
392    ///     let data = scope.new_collection_from(1 .. 10).1;
393    ///
394    ///     let result = scope.region(|child| {
395    ///         data.enter(child)
396    ///             .leave()
397    ///     });
398    ///
399    ///     data.assert_eq(&result);
400    /// });
401    /// ```
402    pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
403    where
404        T: Refines<<G as ScopeParent>::Timestamp>,
405    {
406        self.inner
407            .enter(child)
408            .map(|(data, time, diff)| (data, T::to_inner(time), diff))
409            .as_collection()
410    }
411
412    /// Brings a Collection into a nested scope, at varying times.
413    ///
414    /// The `initial` function indicates the time at which each element of the Collection should appear.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// use timely::dataflow::Scope;
420    /// use differential_dataflow::input::Input;
421    ///
422    /// ::timely::example(|scope| {
423    ///
424    ///     let data = scope.new_collection_from(1 .. 10).1;
425    ///
426    ///     let result = scope.iterative::<u64,_,_>(|child| {
427    ///         data.enter_at(child, |x| *x)
428    ///             .leave()
429    ///     });
430    ///
431    ///     data.assert_eq(&result);
432    /// });
433    /// ```
434    pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
435    where
436        T: Timestamp+Hash,
437        F: FnMut(&D) -> T + Clone + 'static,
438        G::Timestamp: Hash,
439    {
440        self.inner
441            .enter(child)
442            .map(move |(data, time, diff)| {
443                let new_time = Product::new(time, initial(&data));
444                (data, new_time, diff)
445            })
446            .as_collection()
447    }
448
449    /// Delays each difference by a supplied function.
450    ///
451    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
452    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
453    /// ordered, they should have the same order or compare equal once `func` is applied to them (this
454    /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
455    /// to all of the data timestamps).
456    pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
457    where
458        F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
459    {
460        let mut func1 = func.clone();
461        let mut func2 = func.clone();
462
463        self.inner
464            .delay_batch(move |x| func1(x))
465            .map_in_place(move |x| x.1 = func2(&x.1))
466            .as_collection()
467    }
468
469    /// Applies a supplied function to each update.
470    ///
471    /// This method is most commonly used to report information back to the user, often for debugging purposes.
472    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
473    /// not guarantee that it will be called as many times as you might expect.
474    ///
475    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
476    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
477    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
478    /// interesting and less intuitive, unfortunately.
479    ///
480    /// # Examples
481    ///
482    /// ```
483    /// use differential_dataflow::input::Input;
484    ///
485    /// ::timely::example(|scope| {
486    ///     scope.new_collection_from(1 .. 10).1
487    ///          .map_in_place(|x| *x *= 2)
488    ///          .filter(|x| x % 2 == 1)
489    ///          .inspect(|x| println!("error: {:?}", x));
490    /// });
491    /// ```
492    pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
493    where
494        F: FnMut(&(D, G::Timestamp, R))+'static,
495    {
496        self.inner
497            .inspect(func)
498            .as_collection()
499    }
500    /// Applies a supplied function to each batch of updates.
501    ///
502    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
503    /// timely dataflow capability associated with the batch of updates. The observed batching depends
504    /// on how the system executes, and may vary run to run.
505    ///
506    /// # Examples
507    ///
508    /// ```
509    /// use differential_dataflow::input::Input;
510    ///
511    /// ::timely::example(|scope| {
512    ///     scope.new_collection_from(1 .. 10).1
513    ///          .map_in_place(|x| *x *= 2)
514    ///          .filter(|x| x % 2 == 1)
515    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
516    /// });
517    /// ```
518    pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
519    where
520        F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
521    {
522        self.inner
523            .inspect_batch(move |time, data| func(time, data))
524            .as_collection()
525    }
526
527    /// Assert if the collection is ever non-empty.
528    ///
529    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
530    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
531    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
532    /// program should indicate that this assertion never found cause to complain.
533    ///
534    /// # Examples
535    ///
536    /// ```
537    /// use differential_dataflow::input::Input;
538    ///
539    /// ::timely::example(|scope| {
540    ///     scope.new_collection_from(1 .. 10).1
541    ///          .map(|x| x * 2)
542    ///          .filter(|x| x % 2 == 1)
543    ///          .assert_empty();
544    /// });
545    /// ```
546    pub fn assert_empty(&self)
547    where
548        D: crate::ExchangeData+Hashable,
549        R: crate::ExchangeData+Hashable + Semigroup,
550        G::Timestamp: Lattice+Ord,
551    {
552        self.consolidate()
553            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
554    }
555}
556
557use timely::dataflow::scopes::ScopeParent;
558use timely::progress::timestamp::Refines;
559
560/// Methods requiring a nested scope.
561impl<'a, G: Scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<Child<'a, G, T>, D, R>
562where
563    T: Refines<<G as ScopeParent>::Timestamp>,
564{
565    /// Returns the final value of a Collection from a nested scope to its containing scope.
566    ///
567    /// # Examples
568    ///
569    /// ```
570    /// use timely::dataflow::Scope;
571    /// use differential_dataflow::input::Input;
572    ///
573    /// ::timely::example(|scope| {
574    ///
575    ///    let data = scope.new_collection_from(1 .. 10).1;
576    ///
577    ///    let result = scope.region(|child| {
578    ///         data.enter(child)
579    ///             .leave()
580    ///     });
581    ///
582    ///     data.assert_eq(&result);
583    /// });
584    /// ```
585    pub fn leave(&self) -> Collection<G, D, R> {
586        self.inner
587            .leave()
588            .map(|(data, time, diff)| (data, time.to_outer(), diff))
589            .as_collection()
590    }
591}
592
593/// Methods requiring a region as the scope.
594impl<G: Scope, D, R, C: Container+Data> Collection<Child<'_, G, G::Timestamp>, D, R, C>
595{
596    /// Returns the value of a Collection from a nested region to its containing scope.
597    ///
598    /// This method is a specialization of `leave` to the case that of a nested region.
599    /// It removes the need for an operator that adjusts the timestamp.
600    pub fn leave_region(&self) -> Collection<G, D, R, C> {
601        self.inner
602            .leave()
603            .as_collection()
604    }
605}
606
607/// Methods requiring an Abelian difference, to support negation.
608impl<G: Scope<Timestamp: Data>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
609    /// Assert if the collections are ever different.
610    ///
611    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
612    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
613    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
614    /// indicate that this assertion never found cause to complain.
615    ///
616    /// # Examples
617    ///
618    /// ```
619    /// use differential_dataflow::input::Input;
620    ///
621    /// ::timely::example(|scope| {
622    ///
623    ///     let data = scope.new_collection_from(1 .. 10).1;
624    ///
625    ///     let odds = data.filter(|x| x % 2 == 1);
626    ///     let evens = data.filter(|x| x % 2 == 0);
627    ///
628    ///     odds.concat(&evens)
629    ///         .assert_eq(&data);
630    /// });
631    /// ```
632    pub fn assert_eq(&self, other: &Self)
633    where
634        D: crate::ExchangeData+Hashable,
635        R: crate::ExchangeData+Hashable,
636        G::Timestamp: Lattice+Ord,
637    {
638        self.negate()
639            .concat(other)
640            .assert_empty();
641    }
642}
643
644/// Conversion to a differential dataflow Collection.
645pub trait AsCollection<G: Scope, D, R, C> {
646    /// Converts the type to a differential dataflow collection.
647    fn as_collection(&self) -> Collection<G, D, R, C>;
648}
649
650impl<G: Scope, D, R, C: Clone> AsCollection<G, D, R, C> for StreamCore<G, C> {
651    /// Converts the type to a differential dataflow collection.
652    ///
653    /// By calling this method, you guarantee that the timestamp invariant (as documented on
654    /// [Collection]) is upheld. This method will not check it.
655    fn as_collection(&self) -> Collection<G, D, R, C> {
656        Collection::<G,D,R,C>::new(self.clone())
657    }
658}
659
660/// Concatenates multiple collections.
661///
662/// This method has the effect of a sequence of calls to `concat`, but it does
663/// so in one operator rather than a chain of many operators.
664///
665/// # Examples
666///
667/// ```
668/// use differential_dataflow::input::Input;
669///
670/// ::timely::example(|scope| {
671///
672///     let data = scope.new_collection_from(1 .. 10).1;
673///
674///     let odds = data.filter(|x| x % 2 == 1);
675///     let evens = data.filter(|x| x % 2 == 0);
676///
677///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
678///         .assert_eq(&data);
679/// });
680/// ```
681pub fn concatenate<G, D, R, C, I>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
682where
683    G: Scope,
684    D: Data,
685    R: Semigroup + 'static,
686    C: Container,
687    I: IntoIterator<Item=Collection<G, D, R, C>>,
688{
689    scope
690        .concatenate(iterator.into_iter().map(|x| x.inner))
691        .as_collection()
692}